Make OIO/NIO/EPOLL autoReadClear consistent

Motivation:
OIO/NIO use a volatile variable to track if a read is pending. EPOLL does not use a volatile an executes a Runnable on the event loop thread to set readPending to false. These mechansims should be consistent, and not using a volatile variable is preferable because the variable is written to frequently in the event loop thread.
OIO also does not set readPending to false before each fireChannelRead operation and may result in reading more data than the user desires.

Modifications:
- OIO/NIO should not use a volatile variable for readPending
- OIO should set readPending to false before each fireChannelRead

Result:
OIO/NIO/EPOLL are more consistent w.r.t. readPending and volatile variable operations are reduced
Fixes https://github.com/netty/netty/issues/5069
This commit is contained in:
Scott Mitchell 2016-04-05 10:13:18 -07:00
parent 7b121c26ae
commit 5b48fc284e
29 changed files with 902 additions and 691 deletions

View File

@ -17,10 +17,16 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;
@ -28,174 +34,193 @@ import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SocketAutoReadTest extends AbstractSocketTest {
private static final Random random = new Random();
static final byte[] data = new byte[1024];
static {
random.nextBytes(data);
}
// See https://github.com/netty/netty/pull/2375
@Test(timeout = 30000)
public void testAutoReadDisableOutsideChannelRead() throws Throwable {
@Test
public void testAutoReadOffDuringReadOnlyReadsOneTime() throws Throwable {
run();
}
public void testAutoReadDisableOutsideChannelRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
TestHandler sh = new TestHandler() {
private boolean allBytesReceived;
public void testAutoReadOffDuringReadOnlyReadsOneTime(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testAutoReadOffDuringReadOnlyReadsOneTime(true, sb, cb);
testAutoReadOffDuringReadOnlyReadsOneTime(false, sb, cb);
}
private void testAutoReadOffDuringReadOnlyReadsOneTime(boolean readOutsideEventLoopThread,
ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
try {
AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
sb.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.AUTO_READ, true)
.childOption(ChannelOption.AUTO_READ, true)
// We want to ensure that we attempt multiple individual read operations per read loop so we can
// test the auto read feature being turned off when data is first read.
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
.childHandler(serverInitializer);
serverChannel = sb.bind().syncUninterruptibly().channel();
cb.remoteAddress(serverChannel.localAddress())
.option(ChannelOption.AUTO_READ, true)
// We want to ensure that we attempt multiple individual read operations per read loop so we can
// test the auto read feature being turned off when data is first read.
.option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
.handler(clientInitializer);
clientChannel = cb.connect().syncUninterruptibly().channel();
// 3 bytes means 3 independent reads for TestRecvByteBufAllocator
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
serverInitializer.autoReadHandler.assertSingleRead();
// 3 bytes means 3 independent reads for TestRecvByteBufAllocator
serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
clientInitializer.autoReadHandler.assertSingleRead();
if (readOutsideEventLoopThread) {
serverInitializer.channel.read();
}
serverInitializer.autoReadHandler.assertSingleReadSecondTry();
if (readOutsideEventLoopThread) {
clientChannel.read();
}
clientInitializer.autoReadHandler.assertSingleReadSecondTry();
} finally {
if (clientChannel != null) {
clientChannel.close().sync();
}
if (serverChannel != null) {
serverChannel.close().sync();
}
}
}
private static class AutoReadInitializer extends ChannelInitializer<Channel> {
final AutoReadHandler autoReadHandler;
volatile Channel channel;
AutoReadInitializer(boolean readInEventLoop) {
autoReadHandler = new AutoReadHandler(readInEventLoop);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
assertFalse(allBytesReceived);
ctx.writeAndFlush(msg);
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
ctx.channel().config().setAutoRead(false);
allBytesReceived = true;
}
});
}
};
sb.childHandler(sh);
TestHandler ch = new TestHandler();
cb.handler(ch);
Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel();
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
Thread.sleep(500);
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
Thread.sleep(500);
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
Thread.sleep(500);
cc.close().sync();
sc.close().sync();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
throw ch.exception.get();
}
if (sh.exception.get() != null) {
throw sh.exception.get();
}
if (ch.exception.get() != null) {
throw ch.exception.get();
protected void initChannel(Channel ch) throws Exception {
channel = ch;
ch.pipeline().addLast(autoReadHandler);
}
}
// See https://github.com/netty/netty/pull/2375
@Test(timeout = 30000)
public void testAutoReadDisableOutsideChannelReadManualRead() throws Throwable {
run();
}
public void testAutoReadDisableOutsideChannelReadManualRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
ServerTestHandler sh = new ServerTestHandler();
sb.childHandler(sh);
TestHandler ch = new TestHandler();
cb.handler(ch);
Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel();
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
Thread.sleep(500);
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
Thread.sleep(500);
cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync();
Thread.sleep(500);
sh.await();
cc.close().sync();
sc.close().sync();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
throw ch.exception.get();
}
if (sh.exception.get() != null) {
throw sh.exception.get();
}
if (ch.exception.get() != null) {
throw ch.exception.get();
}
}
public static class ServerTestHandler extends TestHandler {
enum State {
AUTO_READ,
SCHEDULED,
BYTES_RECEIVED,
READ_SCHEDULED
}
private static final class AutoReadHandler extends ChannelInboundHandlerAdapter {
private final AtomicInteger count = new AtomicInteger();
private final CountDownLatch latch = new CountDownLatch(1);
private final CountDownLatch latch2;
private final boolean callRead;
private State state = State.AUTO_READ;
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.writeAndFlush(msg);
switch (state) {
case READ_SCHEDULED:
latch.countDown();
break;
case AUTO_READ:
state = State.SCHEDULED;
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
ctx.channel().config().setAutoRead(false);
state = State.BYTES_RECEIVED;
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
state = State.READ_SCHEDULED;
ctx.channel().read();
}
}, 2, TimeUnit.SECONDS);
}
});
break;
case BYTES_RECEIVED:
// Once the state is BYTES_RECEIVED we should not receive anymore data.
fail();
break;
case SCHEDULED:
// nothing to do
break;
}
}
public void await() throws InterruptedException {
latch.await();
}
}
private static class TestHandler extends ChannelInboundHandlerAdapter {
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
cause.printStackTrace();
ctx.close();
}
AutoReadHandler(boolean callRead) {
this.callRead = callRead;
latch2 = new CountDownLatch(callRead ? 3 : 2);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReferenceCountUtil.release(msg);
if (count.incrementAndGet() == 1) {
ctx.channel().config().setAutoRead(false);
}
if (callRead) {
// Test calling read in the EventLoop thread to ensure a read is eventually done.
ctx.read();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
latch2.countDown();
}
void assertSingleRead() throws InterruptedException {
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(count.get() > 0);
}
void assertSingleReadSecondTry() throws InterruptedException {
assertTrue(latch2.await(5, TimeUnit.SECONDS));
assertEquals(callRead ? 3 : 2, count.get());
}
}
/**
* Designed to keep reading as long as autoread is enabled.
*/
private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator {
@Override
public Handle newHandle() {
return new Handle() {
private ChannelConfig config;
private int attemptedBytesRead;
private int lastBytesRead;
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess(), guess());
}
@Override
public int guess() {
return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
}
@Override
public void reset(ChannelConfig config) {
this.config = config;
}
@Override
public void incMessagesRead(int numMessages) {
// No need to track the number of messages read because it is not used.
}
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
}
@Override
public int lastBytesRead() {
return lastBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
@Override
public boolean continueReading() {
return config.isAutoRead();
}
@Override
public void readComplete() {
// Nothing needs to be done or adjusted after each read cycle is completed.
}
};
}
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SocketExceptionHandlingTest extends AbstractSocketTest {
@Test
public void testReadPendingIsResetAfterEachRead() throws Throwable {
run();
}
public void testReadPendingIsResetAfterEachRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
try {
MyInitializer serverInitializer = new MyInitializer();
sb.option(ChannelOption.SO_BACKLOG, 1024);
sb.childHandler(serverInitializer);
serverChannel = sb.bind().syncUninterruptibly().channel();
cb.remoteAddress(serverChannel.localAddress())
.handler(new MyInitializer());
clientChannel = cb.connect().syncUninterruptibly().channel();
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[1024]));
// We expect to get 2 exceptions (1 from BuggyChannelHandler and 1 from ExceptionHandler).
assertTrue(serverInitializer.exceptionHandler.latch1.await(5, TimeUnit.SECONDS));
// After we get the first exception, we should get no more, this is expected to timeout.
assertFalse("Encountered " + serverInitializer.exceptionHandler.count.get() +
" exceptions when 1 was expected",
serverInitializer.exceptionHandler.latch2.await(1, TimeUnit.SECONDS));
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
}
}
private static class MyInitializer extends ChannelInitializer<Channel> {
final ExceptionHandler exceptionHandler = new ExceptionHandler();
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new BuggyChannelHandler());
pipeline.addLast(exceptionHandler);
}
}
private static class BuggyChannelHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReferenceCountUtil.release(msg);
throw new NullPointerException("I am a bug!");
}
}
private static class ExceptionHandler extends ChannelInboundHandlerAdapter {
final AtomicLong count = new AtomicLong();
/**
* We expect to get 1 call to {@link #exceptionCaught(ChannelHandlerContext, Throwable)}.
*/
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (count.incrementAndGet() <= 2) {
latch1.countDown();
} else {
latch2.countDown();
}
// This should not throw any exception.
ctx.close();
}
}
}

View File

@ -0,0 +1,199 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SocketReadPendingTest extends AbstractSocketTest {
@Test
public void testReadPendingIsResetAfterEachRead() throws Throwable {
run();
}
public void testReadPendingIsResetAfterEachRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
Channel serverChannel = null;
Channel clientChannel = null;
try {
ReadPendingInitializer serverInitializer = new ReadPendingInitializer();
ReadPendingInitializer clientInitializer = new ReadPendingInitializer();
sb.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.AUTO_READ, true)
.childOption(ChannelOption.AUTO_READ, false)
// We intend to do 2 reads per read loop wakeup
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2))
.childHandler(serverInitializer);
serverChannel = sb.bind().syncUninterruptibly().channel();
cb.remoteAddress(serverChannel.localAddress())
.option(ChannelOption.AUTO_READ, false)
// We intend to do 2 reads per read loop wakeup
.option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2))
.handler(clientInitializer);
clientChannel = cb.connect().syncUninterruptibly().channel();
// 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4]));
// 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator
assertTrue(serverInitializer.channelInitLatch.await(5, TimeUnit.SECONDS));
serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4]));
serverInitializer.channel.read();
serverInitializer.readPendingHandler.assertAllRead();
clientChannel.read();
clientInitializer.readPendingHandler.assertAllRead();
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
}
}
private static class ReadPendingInitializer extends ChannelInitializer<Channel> {
final ReadPendingReadHandler readPendingHandler = new ReadPendingReadHandler();
final CountDownLatch channelInitLatch = new CountDownLatch(1);
volatile Channel channel;
@Override
protected void initChannel(Channel ch) throws Exception {
channel = ch;
ch.pipeline().addLast(readPendingHandler);
channelInitLatch.countDown();
}
}
private static final class ReadPendingReadHandler extends ChannelInboundHandlerAdapter {
private final AtomicInteger count = new AtomicInteger();
private final CountDownLatch latch = new CountDownLatch(1);
private final CountDownLatch latch2 = new CountDownLatch(2);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReferenceCountUtil.release(msg);
if (count.incrementAndGet() == 1) {
// Call read the first time, to ensure it is not reset the second time.
ctx.read();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
latch2.countDown();
}
void assertAllRead() throws InterruptedException {
assertTrue(latch.await(5, TimeUnit.SECONDS));
// We should only do 1 read loop, because we only called read() on the first channelRead.
assertFalse(latch2.await(1, TimeUnit.SECONDS));
assertEquals(2, count.get());
}
}
/**
* Designed to keep reading as long as autoread is enabled.
*/
private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
private final int numReads;
TestNumReadsRecvByteBufAllocator(int numReads) {
this.numReads = numReads;
}
@Override
public Handle newHandle() {
return new Handle() {
private int attemptedBytesRead;
private int lastBytesRead;
private int numMessagesRead;
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess(), guess());
}
@Override
public int guess() {
return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
}
@Override
public void reset(ChannelConfig config) {
numMessagesRead = 0;
}
@Override
public void incMessagesRead(int numMessages) {
numMessagesRead += numMessages;
}
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
}
@Override
public int lastBytesRead() {
return lastBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
@Override
public boolean continueReading() {
return numMessagesRead < numReads;
}
@Override
public void readComplete() {
// Nothing needs to be done or adjusted after each read cycle is completed.
}
};
}
}
}

View File

@ -52,7 +52,7 @@ public class SocketStringEchoTest extends AbstractSocketTest {
}
}
@Test
@Test(timeout = 30000)
public void testStringEcho() throws Throwable {
run();
}
@ -61,7 +61,7 @@ public class SocketStringEchoTest extends AbstractSocketTest {
testStringEcho(sb, cb, true);
}
@Test
@Test(timeout = 30000)
public void testStringEchoNotAutoRead() throws Throwable {
run();
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketAutoReadTest;
import java.util.List;
public class EpollETSocketAutoReadTest extends SocketAutoReadTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketExceptionHandlingTest;
import java.util.List;
public class EpollETSocketExceptionHandlingTest extends SocketExceptionHandlingTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketReadPendingTest;
import java.util.List;
public class EpollETSocketReadPendingTest extends SocketReadPendingTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketAutoReadTest;
import java.util.List;
public class EpollLTSocketAutoReadTest extends SocketAutoReadTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketExceptionHandlingTest;
import java.util.List;
public class EpollLTSocketExceptionHandlingTest extends SocketExceptionHandlingTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketReadPendingTest;
import java.util.List;
public class EpollLTSocketReadPendingTest extends SocketReadPendingTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.socket();
}
@Override
protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) {
super.configure(bootstrap, bootstrap2, allocator);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
}
}

View File

@ -17,18 +17,14 @@ package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel;
import io.netty.util.ReferenceCountUtil;
import org.junit.Assert;
@ -38,10 +34,8 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -120,471 +114,4 @@ public class EpollSocketChannelTest {
Assert.assertTrue(info.rcvSpace() >= 0);
Assert.assertTrue(info.totalRetrans() >= 0);
}
@Test
public void testExceptionHandlingDoesNotInfiniteLoop() throws InterruptedException {
EventLoopGroup group = new EpollEventLoopGroup();
try {
runExceptionHandleFeedbackLoop(group, EpollServerSocketChannel.class, EpollSocketChannel.class,
new InetSocketAddress(0));
runExceptionHandleFeedbackLoop(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class,
EpollSocketTestPermutation.newSocketAddress());
} finally {
group.shutdownGracefully();
}
}
@Test
public void testAutoReadOffDuringReadOnlyReadsOneTime() throws InterruptedException {
EventLoopGroup group = new EpollEventLoopGroup();
try {
runAutoReadTest(true, group, EpollServerSocketChannel.class,
EpollSocketChannel.class, new InetSocketAddress(0));
runAutoReadTest(true, group, EpollServerDomainSocketChannel.class,
EpollDomainSocketChannel.class, EpollSocketTestPermutation.newSocketAddress());
runAutoReadTest(false, group, EpollServerSocketChannel.class,
EpollSocketChannel.class, new InetSocketAddress(0));
runAutoReadTest(false, group, EpollServerDomainSocketChannel.class,
EpollDomainSocketChannel.class, EpollSocketTestPermutation.newSocketAddress());
} finally {
group.shutdownGracefully();
}
}
@Test
public void testReadPendingIsResetAfterEachRead() throws InterruptedException {
EventLoopGroup group = new EpollEventLoopGroup();
try {
runReadPendingTest(group, EpollServerSocketChannel.class, EpollSocketChannel.class,
new InetSocketAddress(0));
runReadPendingTest(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class,
EpollSocketTestPermutation.newSocketAddress());
} finally {
group.shutdownGracefully();
}
}
private void runAutoReadTest(boolean readOutsideEventLoopThread, EventLoopGroup group,
Class<? extends ServerChannel> serverChannelClass,
Class<? extends Channel> channelClass, SocketAddress bindAddr)
throws InterruptedException {
Channel serverChannel = null;
Channel clientChannel = null;
try {
AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024)
.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(ChannelOption.AUTO_READ, true)
.group(group)
.channel(serverChannelClass)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.childOption(ChannelOption.AUTO_READ, true)
// We want to ensure that we attempt multiple individual read operations per read loop so we can
// test the auto read feature being turned off when data is first read.
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
.childHandler(serverInitializer);
serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel();
Bootstrap b = new Bootstrap()
.group(group)
.channel(channelClass)
.remoteAddress(serverChannel.localAddress())
.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(ChannelOption.AUTO_READ, true)
// We want to ensure that we attempt multiple individual read operations per read loop so we can
// test the auto read feature being turned off when data is first read.
.option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
.handler(clientInitializer);
clientChannel = b.connect().syncUninterruptibly().channel();
// 3 bytes means 3 independent reads for TestRecvByteBufAllocator
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
serverInitializer.autoReadHandler.assertSingleRead();
// 3 bytes means 3 independent reads for TestRecvByteBufAllocator
serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
clientInitializer.autoReadHandler.assertSingleRead();
if (readOutsideEventLoopThread) {
serverInitializer.channel.read();
}
serverInitializer.autoReadHandler.assertSingleReadSecondTry();
if (readOutsideEventLoopThread) {
clientChannel.read();
}
clientInitializer.autoReadHandler.assertSingleReadSecondTry();
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
}
}
private void runReadPendingTest(EventLoopGroup group,
Class<? extends ServerChannel> serverChannelClass,
Class<? extends Channel> channelClass, SocketAddress bindAddr)
throws InterruptedException {
Channel serverChannel = null;
Channel clientChannel = null;
try {
ReadPendingInitializer serverInitializer = new ReadPendingInitializer();
ReadPendingInitializer clientInitializer = new ReadPendingInitializer();
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024)
.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(ChannelOption.AUTO_READ, true)
.group(group)
.channel(serverChannelClass)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.childOption(ChannelOption.AUTO_READ, false)
// We intend to do 2 reads per read loop wakeup
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2))
.childHandler(serverInitializer);
serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel();
Bootstrap b = new Bootstrap()
.group(group)
.channel(channelClass)
.remoteAddress(serverChannel.localAddress())
.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(ChannelOption.AUTO_READ, false)
// We intend to do 2 reads per read loop wakeup
.option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2))
.handler(clientInitializer);
clientChannel = b.connect().syncUninterruptibly().channel();
// 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4]));
// 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator
assertTrue(serverInitializer.channelInitLatch.await(5, TimeUnit.SECONDS));
serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4]));
serverInitializer.channel.read();
serverInitializer.readPendingHandler.assertAllRead();
clientChannel.read();
clientInitializer.readPendingHandler.assertAllRead();
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
}
}
private void runExceptionHandleFeedbackLoop(EventLoopGroup group, Class<? extends ServerChannel> serverChannelClass,
Class<? extends Channel> channelClass, SocketAddress bindAddr) throws InterruptedException {
Channel serverChannel = null;
Channel clientChannel = null;
try {
MyInitializer serverInitializer = new MyInitializer();
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024);
sb.group(group)
.channel(serverChannelClass)
.childHandler(serverInitializer);
serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel();
Bootstrap b = new Bootstrap();
b.group(group);
b.channel(channelClass);
b.remoteAddress(serverChannel.localAddress());
b.handler(new MyInitializer());
clientChannel = b.connect().syncUninterruptibly().channel();
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[1024]));
// We expect to get 2 exceptions (1 from BuggyChannelHandler and 1 from ExceptionHandler).
assertTrue(serverInitializer.exceptionHandler.latch1.await(2, TimeUnit.SECONDS));
// After we get the first exception, we should get no more, this is expected to timeout.
assertFalse("Encountered " + serverInitializer.exceptionHandler.count.get() +
" exceptions when 1 was expected",
serverInitializer.exceptionHandler.latch2.await(2, TimeUnit.SECONDS));
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
}
}
/**
* Designed to keep reading as long as autoread is enabled.
*/
private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
private final int numReads;
TestNumReadsRecvByteBufAllocator(int numReads) {
this.numReads = numReads;
}
@Override
public Handle newHandle() {
return new Handle() {
private ChannelConfig config;
private int attemptedBytesRead;
private int lastBytesRead;
private int numMessagesRead;
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
@Override
public int guess() {
return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
}
@Override
public void reset(ChannelConfig config) {
this.config = config;
numMessagesRead = 0;
}
@Override
public void incMessagesRead(int numMessages) {
numMessagesRead += numMessages;
}
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
}
@Override
public int lastBytesRead() {
return lastBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
@Override
public boolean continueReading() {
return numMessagesRead < numReads;
}
@Override
public void readComplete() {
}
};
}
}
/**
* Designed to keep reading as long as autoread is enabled.
*/
private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator {
@Override
public Handle newHandle() {
return new Handle() {
private ChannelConfig config;
private int attemptedBytesRead;
private int lastBytesRead;
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
@Override
public int guess() {
return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
}
@Override
public void reset(ChannelConfig config) {
this.config = config;
}
@Override
public void incMessagesRead(int numMessages) {
}
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
}
@Override
public int lastBytesRead() {
return lastBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
@Override
public boolean continueReading() {
return config.isAutoRead();
}
@Override
public void readComplete() {
}
};
}
}
private static class AutoReadInitializer extends ChannelInitializer<Channel> {
final AutoReadHandler autoReadHandler;
volatile Channel channel;
AutoReadInitializer(boolean readInEventLoop) {
autoReadHandler = new AutoReadHandler(readInEventLoop);
}
@Override
protected void initChannel(Channel ch) throws Exception {
channel = ch;
ch.pipeline().addLast(autoReadHandler);
}
}
private static class ReadPendingInitializer extends ChannelInitializer<Channel> {
final ReadPendingReadHandler readPendingHandler = new ReadPendingReadHandler();
final CountDownLatch channelInitLatch = new CountDownLatch(1);
volatile Channel channel;
@Override
protected void initChannel(Channel ch) throws Exception {
channel = ch;
ch.pipeline().addLast(readPendingHandler);
channelInitLatch.countDown();
}
}
private static class MyInitializer extends ChannelInitializer<Channel> {
final ExceptionHandler exceptionHandler = new ExceptionHandler();
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new BuggyChannelHandler());
pipeline.addLast(exceptionHandler);
}
}
private static class BuggyChannelHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReferenceCountUtil.release(msg);
throw new NullPointerException("I am a bug!");
}
}
private static final class ReadPendingReadHandler extends ChannelInboundHandlerAdapter {
private final AtomicInteger count = new AtomicInteger();
private final CountDownLatch latch = new CountDownLatch(1);
private final CountDownLatch latch2 = new CountDownLatch(2);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReferenceCountUtil.release(msg);
if (count.incrementAndGet() == 1) {
// Call read the first time, to ensure it is not reset the second time.
ctx.read();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
latch2.countDown();
}
void assertAllRead() throws InterruptedException {
assertTrue(latch.await(5, TimeUnit.SECONDS));
// We should only do 1 read loop, because we only called read() on the first channelRead.
assertFalse(latch2.await(1, TimeUnit.SECONDS));
assertEquals(2, count.get());
}
}
private static final class AutoReadHandler extends ChannelInboundHandlerAdapter {
private final AtomicInteger count = new AtomicInteger();
private final CountDownLatch latch = new CountDownLatch(1);
private final CountDownLatch latch2;
private final boolean callRead;
AutoReadHandler(boolean callRead) {
this.callRead = callRead;
latch2 = new CountDownLatch(callRead ? 3 : 2);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReferenceCountUtil.release(msg);
if (count.incrementAndGet() == 1) {
ctx.channel().config().setAutoRead(false);
}
if (callRead) {
// Test calling read in the EventLoop thread to ensure a read is eventually done.
ctx.read();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
latch2.countDown();
}
void assertSingleRead() throws InterruptedException {
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(count.get() > 0);
}
void assertSingleReadSecondTry() throws InterruptedException {
assertTrue(latch2.await(5, TimeUnit.SECONDS));
assertEquals(callRead ? 3 : 2, count.get());
}
}
private static class ExceptionHandler extends ChannelInboundHandlerAdapter {
final AtomicLong count = new AtomicLong();
/**
* We expect to get 1 call to {@link #exceptionCaught(ChannelHandlerContext, Throwable)}.
*/
final CountDownLatch latch1 = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (count.incrementAndGet() <= 2) {
latch1.countDown();
} else {
latch2.countDown();
}
// This should not throw any exception.
ctx.close();
}
}
}

View File

@ -398,7 +398,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett
@Override
protected void autoReadCleared() {
setReadPending(false);
clearReadPending();
}
}
}

View File

@ -234,7 +234,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel
@Override
protected void autoReadCleared() {
setReadPending(false);
clearReadPending();
}
}
}

View File

@ -464,7 +464,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel
@Override
protected void autoReadCleared() {
setReadPending(false);
clearReadPending();
}
}
}

View File

@ -303,7 +303,7 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel
@Override
protected void autoReadCleared() {
setReadPending(false);
clearReadPending();
}
}
}

View File

@ -76,7 +76,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
setReadPending(false);
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
@ -93,11 +93,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public final void read() {
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
@ -118,7 +113,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
}
allocHandle.incMessagesRead(1);
setReadPending(false);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
@ -138,7 +133,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !isReadPending()) {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}

View File

@ -61,7 +61,13 @@ public abstract class AbstractNioChannel extends AbstractChannel {
protected final int readInterestOp;
volatile SelectionKey selectionKey;
private volatile boolean inputShutdown;
private volatile boolean readPending;
boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
readPending = false;
}
};
/**
* The future of the current connection attempt. If not null, subsequent
@ -125,12 +131,53 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return selectionKey;
}
/**
* @deprecated No longer supported.
* No longer supported.
*/
@Deprecated
protected boolean isReadPending() {
return readPending;
}
protected void setReadPending(boolean readPending) {
/**
* @deprecated Use {@link #clearReadPending()} if appropriate instead.
* No longer supported.
*/
@Deprecated
protected void setReadPending(final boolean readPending) {
if (isRegistered()) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
this.readPending = readPending;
} else {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
AbstractNioChannel.this.readPending = readPending;
}
});
}
} else {
this.readPending = readPending;
}
}
/**
* Set read pending to {@code false}.
*/
protected final void clearReadPending() {
if (isRegistered()) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
readPending = false;
} else {
eventLoop.execute(clearReadPendingRunnable);
}
} else {
// Best effort if we are not registered yet clear readPending. This happens during channel initialization.
readPending = false;
}
}
/**

View File

@ -54,11 +54,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
@ -85,7 +80,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
setReadPending(false);
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
@ -115,7 +110,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !isReadPending()) {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}

View File

@ -93,7 +93,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
setReadPending(false);
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
@ -110,12 +110,14 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
@Override
protected void doRead() {
final ChannelConfig config = config();
if (isInputShutdown() || !config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
if (isInputShutdown() || !readPending) {
// We have to check readPending here because the Runnable to read could have been scheduled and later
// during the same read loop readPending was set to false.
return;
}
// OIO reads are scheduled as a runnable object, the read is not pending as soon as the runnable is run.
setReadPending(false);
// In OIO we should set readPending to false even if the read was not successful so we can schedule
// another read on the event loop if no reads are done.
readPending = false;
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
@ -123,19 +125,22 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean read = false;
boolean close = false;
boolean readData = false;
try {
byteBuf = allocHandle.allocate(allocator);
do {
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
if (!read) { // nothing was read. release the buffer.
if (!byteBuf.isReadable()) { // nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
}
break;
} else {
readData = true;
}
read = true;
final int available = available();
if (available <= 0) {
@ -148,7 +153,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
final int maxCapacity = byteBuf.maxCapacity();
if (capacity == maxCapacity) {
allocHandle.incMessagesRead(1);
read = false;
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = allocHandle.allocate(allocator);
} else {
@ -162,27 +167,32 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
}
} while (allocHandle.continueReading());
if (read) {
if (byteBuf != null) {
// It is possible we allocated a buffer because the previous one was not writable, but then didn't use
// it because allocHandle.continueReading() returned false.
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
byteBuf = null;
}
if (readData) {
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
if (allocHandle.lastBytesRead() < 0) {
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle);
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (allocHandle.lastBytesRead() == 0 && isActive()) {
// If the read amount was 0 and the channel is still active we need to trigger a new read()
// as otherwise we will never try to read again and the user will never know.
// Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are
// able to process the rest of the tasks in the queue first.
//
// See https://github.com/netty/netty/issues/2404
if (readPending || config.isAutoRead() || !readData && isActive()) {
// Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
// should execute read() again because no data may have been read.
read();
}
}

View File

@ -20,6 +20,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.ThreadPerChannelEventLoop;
import io.netty.util.internal.OneTimeTask;
import java.net.SocketAddress;
@ -30,14 +31,19 @@ public abstract class AbstractOioChannel extends AbstractChannel {
protected static final int SO_TIMEOUT = 1000;
private volatile boolean readPending;
boolean readPending;
private final Runnable readTask = new Runnable() {
@Override
public void run() {
doRead();
}
};
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
readPending = false;
}
};
/**
* @see AbstractChannel#AbstractChannel(Channel)
@ -87,21 +93,62 @@ public abstract class AbstractOioChannel extends AbstractChannel {
@Override
protected void doBeginRead() throws Exception {
if (isReadPending()) {
if (readPending) {
return;
}
setReadPending(true);
readPending = true;
eventLoop().execute(readTask);
}
protected abstract void doRead();
/**
* @deprecated No longer supported.
* No longer supported.
*/
@Deprecated
protected boolean isReadPending() {
return readPending;
}
protected void setReadPending(boolean readPending) {
/**
* @deprecated Use {@link #clearReadPending()} if appropriate instead.
* No longer supported.
*/
@Deprecated
protected void setReadPending(final boolean readPending) {
if (isRegistered()) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
this.readPending = readPending;
} else {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
AbstractOioChannel.this.readPending = readPending;
}
});
}
} else {
this.readPending = readPending;
}
}
/**
* Set read pending to {@code false}.
*/
protected final void clearReadPending() {
if (isRegistered()) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
readPending = false;
} else {
eventLoop.execute(clearReadPendingRunnable);
}
} else {
// Best effort if we are not registered yet clear readPending. This happens during channel initialization.
readPending = false;
}
}
}

View File

@ -37,14 +37,16 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
@Override
protected void doRead() {
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
if (!readPending) {
// We have to check readPending here because the Runnable to read could have been scheduled and later
// during the same read loop readPending was set to false.
return;
}
// OIO reads are scheduled as a runnable object, the read is not pending as soon as the runnable is run.
setReadPending(false);
// In OIO we should set readPending to false even if the read was not successful so we can schedule
// another read on the event loop if no reads are done.
readPending = false;
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
@ -69,13 +71,18 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
exception = t;
}
boolean readData = false;
int size = readBuf.size();
if (size > 0) {
readData = true;
for (int i = 0; i < size; i++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
if (exception != null) {
if (exception instanceof IOException) {
@ -89,13 +96,9 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel {
if (isOpen()) {
unsafe().close(unsafe().voidPromise());
}
} else if (allocHandle.lastBytesRead() == 0 && isActive()) {
// If the read amount was 0 and the channel is still active we need to trigger a new read()
// as otherwise we will never try to read again and the user will never know.
// Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are
// able to process the rest of the tasks in the queue first.
//
// See https://github.com/netty/netty/issues/2404
} else if (readPending || config.isAutoRead() || !readData && isActive()) {
// Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we
// should execute read() again because no data may have been read.
read();
}
}

View File

@ -579,7 +579,12 @@ public final class NioDatagramChannel
}
@Override
@Deprecated
protected void setReadPending(boolean readPending) {
super.setReadPending(readPending);
}
void clearReadPending0() {
clearReadPending();
}
}

View File

@ -169,7 +169,7 @@ class NioDatagramChannelConfig extends DefaultDatagramChannelConfig {
@Override
protected void autoReadCleared() {
((NioDatagramChannel) channel).setReadPending(false);
((NioDatagramChannel) channel).clearReadPending0();
}
private Object getOption0(Object option) {

View File

@ -191,7 +191,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
@Override
protected void autoReadCleared() {
setReadPending(false);
clearReadPending();
}
}
}

View File

@ -361,7 +361,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
@Override
protected void autoReadCleared() {
setReadPending(false);
clearReadPending();
}
}
}

View File

@ -155,7 +155,7 @@ public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChan
@Override
protected void autoReadCleared() {
if (channel instanceof OioServerSocketChannel) {
((OioServerSocketChannel) channel).setReadPending(false);
((OioServerSocketChannel) channel).clearReadPending0();
}
}

View File

@ -183,7 +183,7 @@ public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig im
@Override
protected void autoReadCleared() {
if (channel instanceof OioSocketChannel) {
((OioSocketChannel) channel).setReadPending(false);
((OioSocketChannel) channel).clearReadPending0();
}
}

View File

@ -195,8 +195,13 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
throw new UnsupportedOperationException();
}
@Deprecated
@Override
protected void setReadPending(boolean readPending) {
super.setReadPending(readPending);
}
final void clearReadPending0() {
super.clearReadPending();
}
}

View File

@ -234,8 +234,13 @@ public class OioSocketChannel extends OioByteStreamChannel
return false;
}
@Deprecated
@Override
protected void setReadPending(boolean readPending) {
super.setReadPending(readPending);
}
final void clearReadPending0() {
clearReadPending();
}
}