From 95652fef12926b78d8b6200cc37eb11cef9b950e Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 17 Mar 2021 12:58:06 +0100 Subject: [PATCH] Allow to configure the maximum number of message to write per eventloop (#11086) Motivation: Allow to configure the maximum number of messages to write per eventloop run. This can be useful to ensure we read data in a timely manner and not let writes dominate the CPU time. This is especially useful in protocols like QUIC where you need to read "fast enough" as otherwise you may not read the ACKs fast enough. Modifications: - Add new ChannelOption / config that allows to limit the number of messages to write per eventloop run. - Respect this setting for DatagramChannels Result: Reduce the risk of having WRITES block the processing of other events in a timely manner Co-authored-by: terrarier2111 <58695553+terrarier2111@users.noreply.github.com> --- .../channel/epoll/EpollDatagramChannel.java | 38 ++++++++++--------- .../epoll/EpollDatagramChannelConfig.java | 7 ++++ .../epoll/NativeDatagramPacketArray.java | 18 ++++++--- .../channel/kqueue/KQueueDatagramChannel.java | 15 +++++--- .../kqueue/KQueueDatagramChannelConfig.java | 6 +++ .../io/netty/channel/AbstractChannel.java | 12 ++++++ .../java/io/netty/channel/ChannelOption.java | 2 + .../netty/channel/DefaultChannelConfig.java | 27 ++++++++++++- .../nio/AbstractNioMessageChannel.java | 24 +++++++----- .../socket/DefaultDatagramChannelConfig.java | 6 +++ 10 files changed, 117 insertions(+), 38 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index ceb2a26abf..7a8836ea6b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -287,11 +287,11 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { - for (;;) { + int maxMessagesPerWrite = maxMessagesPerWrite(); + while (maxMessagesPerWrite > 0) { Object msg = in.current(); if (msg == null) { // Wrote all messages. - clearFlag(Native.EPOLLOUT); break; } @@ -301,7 +301,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements // We only handle UDP_SEGMENT in sendmmsg. in.current() instanceof SegmentedDatagramPacket) { NativeDatagramPacketArray array = cleanDatagramPacketArray(); - array.add(in, isConnected()); + array.add(in, isConnected(), maxMessagesPerWrite); int cnt = array.count(); if (cnt >= 1) { @@ -309,19 +309,15 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements int offset = 0; NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets(); - while (cnt > 0) { - int send = socket.sendmmsg(packets, offset, cnt); - if (send == 0) { - // Did not write all messages. - setFlag(Native.EPOLLOUT); - return; - } - for (int i = 0; i < send; i++) { - in.remove(); - } - cnt -= send; - offset += send; + int send = socket.sendmmsg(packets, offset, cnt); + if (send == 0) { + // Did not write all messages. + break; } + for (int i = 0; i < send; i++) { + in.remove(); + } + maxMessagesPerWrite -= send; continue; } } @@ -335,18 +331,26 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements if (done) { in.remove(); + maxMessagesPerWrite --; } else { - // Did not write all messages. - setFlag(Native.EPOLLOUT); break; } } catch (IOException e) { + maxMessagesPerWrite --; // Continue on write error as a DatagramChannel can write to multiple remote peers // // See https://github.com/netty/netty/issues/2665 in.remove(e); } } + + if (in.isEmpty()) { + // Did write all messages. + clearFlag(Native.EPOLLOUT); + } else { + // Did not write all messages. + setFlag(Native.EPOLLOUT); + } } private boolean doWriteMessage(Object msg) throws Exception { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java index 6fc0defcd2..bca61f3a45 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java @@ -17,6 +17,7 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; import io.netty.channel.FixedRecvByteBufAllocator; @@ -526,4 +527,10 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme public int getMaxDatagramPayloadSize() { return maxDatagramSize; } + + @Override + public EpollDatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) { + super.setMaxMessagesPerWrite(maxMessagesPerWrite); + return this; + } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java index 60e4de8bae..6e3e7b1d7f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java @@ -79,8 +79,9 @@ final class NativeDatagramPacketArray { return true; } - void add(ChannelOutboundBuffer buffer, boolean connected) throws Exception { + void add(ChannelOutboundBuffer buffer, boolean connected, int maxMessagesPerWrite) throws Exception { processor.connected = connected; + processor.maxMessagesPerWrite = maxMessagesPerWrite; buffer.forEachFlushedMessage(processor); } @@ -109,9 +110,11 @@ final class NativeDatagramPacketArray { private final class MyMessageProcessor implements MessageProcessor { private boolean connected; + private int maxMessagesPerWrite; @Override public boolean processMessage(Object msg) { + final boolean added; if (msg instanceof DatagramPacket) { DatagramPacket packet = (DatagramPacket) msg; ByteBuf buf = packet.content(); @@ -124,11 +127,16 @@ final class NativeDatagramPacketArray { segmentSize = seg; } } - return add0(buf, buf.readerIndex(), buf.readableBytes(), segmentSize, packet.recipient()); - } - if (msg instanceof ByteBuf && connected) { + added = add0(buf, buf.readerIndex(), buf.readableBytes(), segmentSize, packet.recipient()); + } else if (msg instanceof ByteBuf && connected) { ByteBuf buf = (ByteBuf) msg; - return add0(buf, buf.readerIndex(), buf.readableBytes(), 0, null); + added = add0(buf, buf.readerIndex(), buf.readableBytes(), 0, null); + } else { + added = false; + } + if (added) { + maxMessagesPerWrite--; + return maxMessagesPerWrite > 0; } return false; } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java index 7aa171f31b..a19401dd8a 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java @@ -245,11 +245,10 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { - for (;;) { + int maxMessagesPerWrite = maxMessagesPerWrite(); + while (maxMessagesPerWrite > 0) { Object msg = in.current(); if (msg == null) { - // Wrote all messages. - writeFilter(false); break; } @@ -264,18 +263,22 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement if (done) { in.remove(); + maxMessagesPerWrite --; } else { - // Did not write all messages. - writeFilter(true); - break; + break; } } catch (IOException e) { + maxMessagesPerWrite --; + // Continue on write error as a DatagramChannel can write to multiple remote peers // // See https://github.com/netty/netty/issues/2665 in.remove(e); } } + + // Whether all messages were written or not. + writeFilter(in.isEmpty()); } private boolean doWriteMessage(Object msg) throws Exception { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannelConfig.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannelConfig.java index 73810132b7..ee6b070e66 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannelConfig.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannelConfig.java @@ -382,4 +382,10 @@ public final class KQueueDatagramChannelConfig extends KQueueChannelConfig imple public KQueueDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) { throw new UnsupportedOperationException("Multicast not supported"); } + + @Override + public KQueueDatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) { + super.setMaxMessagesPerWrite(maxMessagesPerWrite); + return this; + } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index ae5d0adb02..6589cd5bd5 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -88,6 +88,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha pipeline = newChannelPipeline(); } + protected final int maxMessagesPerWrite() { + ChannelConfig config = config(); + if (config instanceof DefaultChannelConfig) { + return ((DefaultChannelConfig) config).getMaxMessagesPerWrite(); + } + Integer value = config.getOption(ChannelOption.MAX_MESSAGES_PER_WRITE); + if (value == null) { + return Integer.MAX_VALUE; + } + return value; + } + @Override public final ChannelId id() { return id; diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java index d75ed02393..c08cd28f91 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOption.java +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -86,6 +86,8 @@ public class ChannelOption extends AbstractConstant> { */ @Deprecated public static final ChannelOption MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ"); + public static final ChannelOption MAX_MESSAGES_PER_WRITE = valueOf("MAX_MESSAGES_PER_WRITE"); + public static final ChannelOption WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT"); /** * @deprecated Use {@link #WRITE_BUFFER_WATER_MARK} diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index 09e5e0104b..e303a668e1 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -29,6 +29,7 @@ import static io.netty.channel.ChannelOption.AUTO_CLOSE; import static io.netty.channel.ChannelOption.AUTO_READ; import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ; +import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_WRITE; import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR; import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR; import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP; @@ -62,6 +63,8 @@ public class DefaultChannelConfig implements ChannelConfig { private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT; private volatile int writeSpinCount = 16; + private volatile int maxMessagesPerWrite = Integer.MAX_VALUE; + @SuppressWarnings("FieldMayBeFinal") private volatile int autoRead = 1; private volatile boolean autoClose = true; @@ -85,7 +88,7 @@ public class DefaultChannelConfig implements ChannelConfig { CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT, ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK, WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR, - SINGLE_EVENTEXECUTOR_PER_GROUP); + SINGLE_EVENTEXECUTOR_PER_GROUP, MAX_MESSAGES_PER_WRITE); } protected Map, Object> getOptions( @@ -155,6 +158,9 @@ public class DefaultChannelConfig implements ChannelConfig { if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { return (T) Boolean.valueOf(getPinEventExecutorPerGroup()); } + if (option == MAX_MESSAGES_PER_WRITE) { + return (T) Integer.valueOf(getMaxMessagesPerWrite()); + } return null; } @@ -187,6 +193,8 @@ public class DefaultChannelConfig implements ChannelConfig { setMessageSizeEstimator((MessageSizeEstimator) value); } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { setPinEventExecutorPerGroup((Boolean) value); + } else if (option == MAX_MESSAGES_PER_WRITE) { + setMaxMessagesPerWrite((Integer) value); } else { return false; } @@ -247,6 +255,23 @@ public class DefaultChannelConfig implements ChannelConfig { } } + /** + * Get the maximum number of message to write per eventloop run. Once this limit is + * reached we will continue to process other events before trying to write the remaining messages. + */ + public int getMaxMessagesPerWrite() { + return maxMessagesPerWrite; + } + + /** + * Set the maximum number of message to write per eventloop run. Once this limit is + * reached we will continue to process other events before trying to write the remaining messages. + */ + public ChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) { + this.maxMessagesPerWrite = ObjectUtil.checkPositive(maxMessagesPerWrite, "maxMessagesPerWrite"); + return this; + } + @Override public int getWriteSpinCount() { return writeSpinCount; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index b49d5f9e03..15d32460af 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -127,13 +127,10 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); - for (;;) { + int maxMessagesPerWrite = maxMessagesPerWrite(); + while (maxMessagesPerWrite > 0) { Object msg = in.current(); if (msg == null) { - // Wrote all messages. - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - key.interestOps(interestOps & ~SelectionKey.OP_WRITE); - } break; } try { @@ -146,22 +143,31 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { } if (done) { + maxMessagesPerWrite--; in.remove(); } else { - // Did not write all messages. - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - key.interestOps(interestOps | SelectionKey.OP_WRITE); - } break; } } catch (Exception e) { if (continueOnWriteError()) { + maxMessagesPerWrite--; in.remove(e); } else { throw e; } } } + if (in.isEmpty()) { + // Wrote all messages. + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + key.interestOps(interestOps & ~SelectionKey.OP_WRITE); + } + } else { + // Did not write all messages. + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + key.interestOps(interestOps | SelectionKey.OP_WRITE); + } + } } /** diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java index 1a267c1e19..32a8049501 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java @@ -426,4 +426,10 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement super.setMessageSizeEstimator(estimator); return this; } + + @Override + public DatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) { + super.setMaxMessagesPerWrite(maxMessagesPerWrite); + return this; + } }