Allow to configure the maximum number of message to write per eventloop (#11086)

Motivation:

Allow to configure the maximum number of messages to write per eventloop run. This can be useful to ensure we read data in a timely manner and not let writes dominate the CPU time. This is especially useful in protocols like QUIC where you need to read "fast enough" as otherwise you may not read the ACKs fast enough.

Modifications:

- Add new ChannelOption / config that allows to limit the number of messages to write per eventloop run.
- Respect this setting for DatagramChannels

Result:

Reduce the risk of having WRITES block the processing of other events in a timely manner

Co-authored-by: terrarier2111 <58695553+terrarier2111@users.noreply.github.com>
This commit is contained in:
Norman Maurer 2021-03-17 12:58:06 +01:00
parent a19d9c2ab6
commit 817052d019
10 changed files with 117 additions and 38 deletions

View File

@ -288,11 +288,11 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int maxMessagesPerWrite = maxMessagesPerWrite();
while (maxMessagesPerWrite > 0) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearFlag(Native.EPOLLOUT);
break;
}
@ -302,7 +302,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
// We only handle UDP_SEGMENT in sendmmsg.
in.current() instanceof SegmentedDatagramPacket) {
NativeDatagramPacketArray array = cleanDatagramPacketArray();
array.add(in, isConnected());
array.add(in, isConnected(), maxMessagesPerWrite);
int cnt = array.count();
if (cnt >= 1) {
@ -310,19 +310,15 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
int offset = 0;
NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
while (cnt > 0) {
int send = socket.sendmmsg(packets, offset, cnt);
if (send == 0) {
// Did not write all messages.
setFlag(Native.EPOLLOUT);
return;
}
for (int i = 0; i < send; i++) {
in.remove();
}
cnt -= send;
offset += send;
int send = socket.sendmmsg(packets, offset, cnt);
if (send == 0) {
// Did not write all messages.
break;
}
for (int i = 0; i < send; i++) {
in.remove();
}
maxMessagesPerWrite -= send;
continue;
}
}
@ -336,18 +332,26 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (done) {
in.remove();
maxMessagesPerWrite --;
} else {
// Did not write all messages.
setFlag(Native.EPOLLOUT);
break;
}
} catch (IOException e) {
maxMessagesPerWrite --;
// Continue on write error as a DatagramChannel can write to multiple remote peers
//
// See https://github.com/netty/netty/issues/2665
in.remove(e);
}
}
if (in.isEmpty()) {
// Did write all messages.
clearFlag(Native.EPOLLOUT);
} else {
// Did not write all messages.
setFlag(Native.EPOLLOUT);
}
}
private boolean doWriteMessage(Object msg) throws Exception {

View File

@ -17,6 +17,7 @@ package io.netty.channel.epoll;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
@ -520,4 +521,10 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme
public int getMaxDatagramPayloadSize() {
return maxDatagramSize;
}
@Override
public EpollDatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) {
super.setMaxMessagesPerWrite(maxMessagesPerWrite);
return this;
}
}

View File

@ -80,8 +80,9 @@ final class NativeDatagramPacketArray {
return true;
}
void add(ChannelOutboundBuffer buffer, boolean connected) throws Exception {
void add(ChannelOutboundBuffer buffer, boolean connected, int maxMessagesPerWrite) throws Exception {
processor.connected = connected;
processor.maxMessagesPerWrite = maxMessagesPerWrite;
buffer.forEachFlushedMessage(processor);
}
@ -110,9 +111,11 @@ final class NativeDatagramPacketArray {
private final class MyMessageProcessor implements MessageProcessor {
private boolean connected;
private int maxMessagesPerWrite;
@Override
public boolean processMessage(Object msg) {
final boolean added;
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf buf = packet.content();
@ -125,11 +128,16 @@ final class NativeDatagramPacketArray {
segmentSize = seg;
}
}
return add0(buf, buf.readerIndex(), buf.readableBytes(), segmentSize, packet.recipient());
added = add0(buf, buf.readerIndex(), buf.readableBytes(), segmentSize, packet.recipient());
} else if (msg instanceof ByteBufConvertible && connected) {
ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf();
added = add0(buf, buf.readerIndex(), buf.readableBytes(), 0, null);
} else {
added = false;
}
if (msg instanceof ByteBufConvertible && connected) {
ByteBuf buf = ((ByteBufConvertible) msg).asByteBuf();
return add0(buf, buf.readerIndex(), buf.readableBytes(), 0, null);
if (added) {
maxMessagesPerWrite--;
return maxMessagesPerWrite > 0;
}
return false;
}

View File

@ -246,11 +246,10 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int maxMessagesPerWrite = maxMessagesPerWrite();
while (maxMessagesPerWrite > 0) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
writeFilter(false);
break;
}
@ -265,18 +264,22 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
if (done) {
in.remove();
maxMessagesPerWrite --;
} else {
// Did not write all messages.
writeFilter(true);
break;
break;
}
} catch (IOException e) {
maxMessagesPerWrite --;
// Continue on write error as a DatagramChannel can write to multiple remote peers
//
// See https://github.com/netty/netty/issues/2665
in.remove(e);
}
}
// Whether all messages were written or not.
writeFilter(in.isEmpty());
}
private boolean doWriteMessage(Object msg) throws Exception {

View File

@ -382,4 +382,10 @@ public final class KQueueDatagramChannelConfig extends KQueueChannelConfig imple
public KQueueDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) {
throw new UnsupportedOperationException("Multicast not supported");
}
@Override
public KQueueDatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) {
super.setMaxMessagesPerWrite(maxMessagesPerWrite);
return this;
}
}

View File

@ -101,6 +101,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private EventLoop validateEventLoop(EventLoop eventLoop) {
return requireNonNull(eventLoop, "eventLoop");
}
protected final int maxMessagesPerWrite() {
ChannelConfig config = config();
if (config instanceof DefaultChannelConfig) {
return ((DefaultChannelConfig) config).getMaxMessagesPerWrite();
}
Integer value = config.getOption(ChannelOption.MAX_MESSAGES_PER_WRITE);
if (value == null) {
return Integer.MAX_VALUE;
}
return value;
}
@Override
public final ChannelId id() {

View File

@ -87,6 +87,8 @@ public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {
*/
@Deprecated
public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ");
public static final ChannelOption<Integer> MAX_MESSAGES_PER_WRITE = valueOf("MAX_MESSAGES_PER_WRITE");
public static final ChannelOption<Integer> WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT");
/**
* @deprecated Use {@link #WRITE_BUFFER_WATER_MARK}

View File

@ -16,6 +16,7 @@
package io.netty.channel;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.internal.ObjectUtil;
import java.util.IdentityHashMap;
import java.util.Map;
@ -28,6 +29,7 @@ import static io.netty.channel.ChannelOption.AUTO_CLOSE;
import static io.netty.channel.ChannelOption.AUTO_READ;
import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ;
import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_WRITE;
import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR;
import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK;
@ -60,6 +62,8 @@ public class DefaultChannelConfig implements ChannelConfig {
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
private volatile int writeSpinCount = 16;
private volatile int maxMessagesPerWrite = Integer.MAX_VALUE;
@SuppressWarnings("FieldMayBeFinal")
private volatile int autoRead = 1;
private volatile boolean autoClose = true;
@ -82,7 +86,7 @@ public class DefaultChannelConfig implements ChannelConfig {
null,
CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK,
WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR);
WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR, MAX_MESSAGES_PER_WRITE);
}
protected Map<ChannelOption<?>, Object> getOptions(
@ -149,6 +153,9 @@ public class DefaultChannelConfig implements ChannelConfig {
if (option == MESSAGE_SIZE_ESTIMATOR) {
return (T) getMessageSizeEstimator();
}
if (option == MAX_MESSAGES_PER_WRITE) {
return (T) Integer.valueOf(getMaxMessagesPerWrite());
}
return null;
}
@ -179,6 +186,8 @@ public class DefaultChannelConfig implements ChannelConfig {
setWriteBufferWaterMark((WriteBufferWaterMark) value);
} else if (option == MESSAGE_SIZE_ESTIMATOR) {
setMessageSizeEstimator((MessageSizeEstimator) value);
} else if (option == MAX_MESSAGES_PER_WRITE) {
setMaxMessagesPerWrite((Integer) value);
} else {
return false;
}
@ -240,6 +249,23 @@ public class DefaultChannelConfig implements ChannelConfig {
}
}
/**
* Get the maximum number of message to write per eventloop run. Once this limit is
* reached we will continue to process other events before trying to write the remaining messages.
*/
public int getMaxMessagesPerWrite() {
return maxMessagesPerWrite;
}
/**
* Set the maximum number of message to write per eventloop run. Once this limit is
* reached we will continue to process other events before trying to write the remaining messages.
*/
public ChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) {
this.maxMessagesPerWrite = ObjectUtil.checkPositive(maxMessagesPerWrite, "maxMessagesPerWrite");
return this;
}
@Override
public int getWriteSpinCount() {
return writeSpinCount;

View File

@ -133,13 +133,10 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
}
final int interestOps = key.interestOps();
for (;;) {
int maxMessagesPerWrite = maxMessagesPerWrite();
while (maxMessagesPerWrite > 0) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
try {
@ -152,22 +149,31 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
}
if (done) {
maxMessagesPerWrite--;
in.remove();
} else {
// Did not write all messages.
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
break;
}
} catch (Exception e) {
if (continueOnWriteError()) {
maxMessagesPerWrite--;
in.remove(e);
} else {
throw e;
}
}
}
if (in.isEmpty()) {
// Wrote all messages.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
} else {
// Did not write all messages.
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
}
/**

View File

@ -427,4 +427,10 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
super.setMessageSizeEstimator(estimator);
return this;
}
@Override
public DatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) {
super.setMaxMessagesPerWrite(maxMessagesPerWrite);
return this;
}
}