EPOLL ET AutoRead

Motivation:
EPOLL does not support autoread when in ET mode.

Modifications:
- EpollRecvByteAllocatorHandle should not unconditionally force reading just because ET is enabled
- AbstractEpollChannel and all derived classes which implement epollInReady must support a variable which indicates
there may be more data to read. The variable will be used when read is called to simulate a EPOLL wakeup and call epollInReady if necessary. This will ensure that if we don't read until EAGAIN that we will try to read again and not rely on EPOLL to notify us.

Result:
EPOLL ET supports auto read.
This commit is contained in:
Scott Mitchell 2016-02-27 17:56:41 -08:00
parent bd6040a36e
commit bfbef036a8
9 changed files with 300 additions and 152 deletions

View File

@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
@ -135,11 +136,18 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
}
@Override
protected void doBeginRead() throws Exception {
protected final void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
((AbstractEpollUnsafe) unsafe()).readPending = true;
AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
unsafe.readPending = true;
setFlag(readFlag);
// If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified
// again if we didn't consume all the data. So we force a read operation here if there maybe more data.
if (unsafe.maybeMoreDataToRead) {
unsafe.epollInReady();
}
}
final void clearEpollIn() {
@ -299,6 +307,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
protected boolean readPending;
protected boolean maybeMoreDataToRead;
private EpollRecvByteAllocatorHandle allocHandle;
/**
@ -306,6 +315,22 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
*/
abstract void epollInReady();
final void epollInReadAttempted() {
readPending = maybeMoreDataToRead = false;
}
final void epollInFinally(ChannelConfig config) {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
clearEpollIn();
}
}
/**
* Will schedule a {@link #epollInReady()} call on the event loop if necessary.
* @param edgeTriggered {@code true} if the channel is using ET mode. {@code false} otherwise.
@ -390,7 +415,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
* Create a new {@EpollRecvByteAllocatorHandle} instance.
* @param handle The handle to wrap with EPOLL specific logic.
*/
protected abstract EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle);
EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorHandle(handle, config());
}
@Override
protected void flush0() {

View File

@ -22,7 +22,6 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
@ -104,65 +103,55 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
channelPromise.setFailure(new UnsupportedOperationException());
}
@Override
protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorMessageHandle(handle, isFlagSet(Native.EPOLLET));
}
@Override
void epollInReady() {
assert eventLoop().inEventLoop();
if (fd().isInputShutdown()) {
return;
}
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
final ChannelConfig config = config();
if (!readPending && !edgeTriggered && !config.isAutoRead()) {
if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
}
final ChannelPipeline pipeline = pipeline();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
Throwable exception = null;
try {
try {
do {
int socketFd = fd().accept(acceptedAddress);
if (socketFd == -1) {
// lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
// EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
// enabled.
allocHandle.lastBytesRead(fd().accept(acceptedAddress));
epollInReadAttempted();
if (allocHandle.lastBytesRead() == -1) {
// this means everything was handled for now
break;
}
readPending = false;
allocHandle.incMessagesRead(1);
int len = acceptedAddress[0];
pipeline.fireChannelRead(newChildChannel(socketFd, acceptedAddress, 1, len));
pipeline.fireChannelRead(newChildChannel(allocHandle.lastBytesRead(), acceptedAddress, 1, len));
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
allocHandle.readComplete();
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
checkResetEpollIn(edgeTriggered);
checkResetEpollIn(allocHandle.isEdgeTriggered());
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
clearEpollIn0();
}
epollInFinally(config);
}
}
}

View File

@ -660,7 +660,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
return super.prepareToClose();
}
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
EpollRecvByteAllocatorHandle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
@ -669,7 +670,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
byteBuf.release();
}
}
recvBufAllocHandle().readComplete();
allocHandle.readComplete();
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
@ -816,8 +818,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
}
@Override
protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorStreamingHandle(handle, isFlagSet(Native.EPOLLET));
EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorStreamingHandle(handle, config());
}
@Override
@ -826,9 +828,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
return;
}
final ChannelConfig config = config();
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
if (!readPending && !edgeTriggered && !config.isAutoRead()) {
if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
@ -836,7 +839,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
@ -863,6 +865,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
// to handle direct buffers.
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
epollInReadAttempted();
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
@ -870,13 +873,13 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
close = allocHandle.lastBytesRead() < 0;
break;
}
readPending = false;
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
if (close) {
@ -884,18 +887,10 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
checkResetEpollIn(edgeTriggered);
handleReadException(pipeline, byteBuf, t, close, allocHandle);
checkResetEpollIn(allocHandle.isEdgeTriggered());
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
clearEpollIn0();
}
epollInFinally(config);
}
}
}

View File

@ -26,7 +26,6 @@ import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.channel.socket.DatagramPacket;
@ -519,11 +518,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
}
@Override
protected EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) {
return new EpollRecvByteAllocatorMessageHandle(handle, isFlagSet(Native.EPOLLET));
}
@Override
void epollInReady() {
assert eventLoop().inEventLoop();
@ -531,9 +525,10 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
return;
}
DatagramChannelConfig config = config();
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
if (!readPending && !edgeTriggered && !config.isAutoRead()) {
if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
@ -541,7 +536,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
Throwable exception = null;
@ -561,7 +555,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
remoteAddress = fd().recvFrom(nioData, nioData.position(), nioData.limit());
}
epollInReadAttempted();
if (remoteAddress == null) {
allocHandle.lastBytesRead(-1);
data.release();
data = null;
break;
@ -570,7 +566,6 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
allocHandle.incMessagesRead(1);
allocHandle.lastBytesRead(remoteAddress.receivedAmount());
data.writerIndex(data.writerIndex() + allocHandle.lastBytesRead());
readPending = false;
readBuf.add(new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
data = null;
@ -589,22 +584,15 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
}
readBuf.clear();
allocHandle.readComplete();
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
checkResetEpollIn(edgeTriggered);
checkResetEpollIn(allocHandle.isEdgeTriggered());
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
clearEpollIn();
}
epollInFinally(config);
}
}
}

View File

@ -23,7 +23,6 @@ import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
import io.netty.util.internal.OneTimeTask;
import java.net.SocketAddress;
@ -152,51 +151,49 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
if (fd().isInputShutdown()) {
return;
}
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
final ChannelConfig config = config();
if (!readPending && !edgeTriggered && !config.isAutoRead()) {
if (!readPending && !allocHandle.isEdgeTriggered() && !config.isAutoRead()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
clearEpollIn0();
return;
}
final ChannelPipeline pipeline = pipeline();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
try {
do {
int socketFd = Native.recvFd(fd().intValue());
if (socketFd == 0) {
break;
}
if (socketFd == -1) {
readLoop: do {
// lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
// EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
// enabled.
allocHandle.lastBytesRead(Native.recvFd(fd().intValue()));
epollInReadAttempted();
switch(allocHandle.lastBytesRead()) {
case 0:
break readLoop;
case -1:
close(voidPromise());
return;
}
readPending = false;
default:
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(new FileDescriptor(socketFd));
pipeline.fireChannelRead(new FileDescriptor(allocHandle.lastBytesRead()));
break;
}
} while (allocHandle.continueReading());
allocHandle.readComplete();
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
} catch (Throwable t) {
allocHandle.readComplete();
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
checkResetEpollIn(edgeTriggered);
checkResetEpollIn(allocHandle.isEdgeTriggered());
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
clearEpollIn0();
}
epollInFinally(config);
}
}
}

View File

@ -15,26 +15,47 @@
*/
package io.netty.channel.epoll;
import io.netty.channel.ChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
abstract class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle {
private final boolean isEdgeTriggered;
class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle {
private boolean isEdgeTriggered;
private final ChannelConfig config;
private boolean receivedRdHup;
public EpollRecvByteAllocatorHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) {
EpollRecvByteAllocatorHandle(RecvByteBufAllocator.Handle handle, ChannelConfig config) {
super(handle);
this.isEdgeTriggered = isEdgeTriggered;
this.config = config;
}
public final boolean isEdgeTriggered() {
return isEdgeTriggered;
}
public final void receivedRdHup() {
final void receivedRdHup() {
receivedRdHup = true;
}
public final boolean isRdHup() {
return receivedRdHup;
boolean maybeMoreDataToRead() {
return isEdgeTriggered && lastBytesRead() > 0;
}
final void edgeTriggered(boolean edgeTriggered) {
isEdgeTriggered = edgeTriggered;
}
final boolean isEdgeTriggered() {
return isEdgeTriggered;
}
@Override
public final boolean continueReading() {
/**
* EPOLL ET requires that we read until we get an EAGAIN
* (see Q9 in <a href="http://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>). However in order to
* respect auto read we supporting reading to stop if auto read is off. If auto read is on we force reading to
* continue to avoid a {@link java.lang.StackOverflowError} between channelReadComplete and reading from the
* channel. It is expected that the {@link #EpollSocketChannel} implementations will track if we are in
* edgeTriggered mode and all data was not read, and will force a EPOLLIN ready event.
*
* If EPOLLRDHUP has been received we must read until we get a read error.
*/
return receivedRdHup || maybeMoreDataToRead() && config.isAutoRead() || super.continueReading();
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.RecvByteBufAllocator;
/**
* Respects termination conditions for EPOLL message (aka packet) based protocols.
*/
final class EpollRecvByteAllocatorMessageHandle extends EpollRecvByteAllocatorHandle {
public EpollRecvByteAllocatorMessageHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) {
super(handle, isEdgeTriggered);
}
@Override
public boolean continueReading() {
/**
* If edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise. For
* packet oriented descriptors must read until we get a EAGAIN
* (see Q9 in <a href="http://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>).
*
* If EPOLLRDHUP has been received we must read until we get a read error.
*/
return isEdgeTriggered() || isRdHup() ? true : super.continueReading();
}
}

View File

@ -15,27 +15,20 @@
*/
package io.netty.channel.epoll;
import io.netty.channel.ChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
/**
* EPOLL must read until no more data is available while in edge triggered mode. This class will always continue reading
* unless the last read did not fill up the available buffer space.
*/
final class EpollRecvByteAllocatorStreamingHandle extends EpollRecvByteAllocatorHandle {
public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.Handle handle, boolean isEdgeTriggered) {
super(handle, isEdgeTriggered);
public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.Handle handle, ChannelConfig config) {
super(handle, config);
}
@Override
public boolean continueReading() {
boolean maybeMoreDataToRead() {
/**
* if edgeTriggered is used we need to read all bytes/messages as we are not notified again otherwise.
* For stream oriented descriptors we can assume we are done reading if the last read attempt didn't produce
* a full buffer (see Q9 in <a href="http://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>).
*
* If EPOLLRDHUP has been received we must read until we get a read error.
*/
return isRdHup() ? true :
isEdgeTriggered() ? lastBytesRead() == attemptedBytesRead() : super.continueReading();
return isEdgeTriggered() && lastBytesRead() == attemptedBytesRead();
}
}

View File

@ -17,14 +17,18 @@ package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel;
import io.netty.util.ReferenceCountUtil;
import org.junit.Assert;
@ -34,8 +38,10 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -128,6 +134,76 @@ public class EpollSocketChannelTest {
}
}
@Test
public void testAutoReadOffDuringReadOnlyReadsOneTime() throws InterruptedException {
EventLoopGroup group = new EpollEventLoopGroup();
try {
runAutoReadTest(group, EpollServerSocketChannel.class, EpollSocketChannel.class,
new InetSocketAddress(0));
runAutoReadTest(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class,
EpollSocketTestPermutation.newSocketAddress());
} finally {
group.shutdownGracefully();
}
}
private void runAutoReadTest(EventLoopGroup group, Class<? extends ServerChannel> serverChannelClass,
Class<? extends Channel> channelClass, SocketAddress bindAddr) throws InterruptedException {
Channel serverChannel = null;
Channel clientChannel = null;
try {
AutoReadInitializer serverInitializer = new AutoReadInitializer();
AutoReadInitializer clientInitializer = new AutoReadInitializer();
ServerBootstrap sb = new ServerBootstrap();
sb.option(ChannelOption.SO_BACKLOG, 1024)
.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(ChannelOption.AUTO_READ, true)
.group(group)
.channel(serverChannelClass)
.childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.childOption(ChannelOption.AUTO_READ, true)
// We want to ensure that we attempt multiple individual read operations per read loop so we can
// test the auto read feature being turned off when data is first read.
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
.childHandler(serverInitializer);
serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel();
Bootstrap b = new Bootstrap()
.group(group)
.channel(channelClass)
.remoteAddress(serverChannel.localAddress())
.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
.option(ChannelOption.AUTO_READ, true)
// We want to ensure that we attempt multiple individual read operations per read loop so we can
// test the auto read feature being turned off when data is first read.
.option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
.handler(clientInitializer);
clientChannel = b.connect().syncUninterruptibly().channel();
// 3 bytes means 3 independent reads for TestRecvByteBufAllocator
clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
serverInitializer.autoReadHandler.assertSingleRead();
// 3 bytes means 3 independent reads for TestRecvByteBufAllocator
serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
clientInitializer.autoReadHandler.assertSingleRead();
serverInitializer.channel.read();
serverInitializer.autoReadHandler.assertSingleReadSecondTry();
clientChannel.read();
clientInitializer.autoReadHandler.assertSingleReadSecondTry();
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
}
}
private void runExceptionHandleFeedbackLoop(EventLoopGroup group, Class<? extends ServerChannel> serverChannelClass,
Class<? extends Channel> channelClass, SocketAddress bindAddr) throws InterruptedException {
Channel serverChannel = null;
@ -168,6 +244,77 @@ public class EpollSocketChannelTest {
}
}
/**
* Designed to keep reading as long as autoread is enabled.
*/
private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator {
@Override
public Handle newHandle() {
return new Handle() {
private ChannelConfig config;
private int attemptedBytesRead;
private int lastBytesRead;
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
@Override
public int guess() {
return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
}
@Override
public void reset(ChannelConfig config) {
this.config = config;
}
@Override
public void incMessagesRead(int numMessages) {
}
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
}
@Override
public int lastBytesRead() {
return lastBytesRead;
}
@Override
public void attemptedBytesRead(int bytes) {
attemptedBytesRead = bytes;
}
@Override
public int attemptedBytesRead() {
return attemptedBytesRead;
}
@Override
public boolean continueReading() {
return config.isAutoRead();
}
@Override
public void readComplete() {
}
};
}
}
private static class AutoReadInitializer extends ChannelInitializer<Channel> {
final AutoReadHandler autoReadHandler = new AutoReadHandler();
volatile Channel channel;
@Override
protected void initChannel(Channel ch) throws Exception {
channel = ch;
ch.pipeline().addLast(autoReadHandler);
}
}
private static class MyInitializer extends ChannelInitializer<Channel> {
final ExceptionHandler exceptionHandler = new ExceptionHandler();
@Override
@ -187,6 +334,36 @@ public class EpollSocketChannelTest {
}
}
private static final class AutoReadHandler extends ChannelInboundHandlerAdapter {
private final AtomicInteger count = new AtomicInteger();
private final CountDownLatch latch = new CountDownLatch(1);
private final CountDownLatch latch2 = new CountDownLatch(2);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ReferenceCountUtil.release(msg);
if (count.incrementAndGet() == 1) {
ctx.channel().config().setAutoRead(false);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
latch2.countDown();
}
void assertSingleRead() throws InterruptedException {
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(1, count.get());
}
void assertSingleReadSecondTry() throws InterruptedException {
assertTrue(latch2.await(5, TimeUnit.SECONDS));
assertEquals(2, count.get());
}
}
private static class ExceptionHandler extends ChannelInboundHandlerAdapter {
final AtomicLong count = new AtomicLong();
/**