diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index ceb2a26abf..7a8836ea6b 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -287,11 +287,11 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { - for (;;) { + int maxMessagesPerWrite = maxMessagesPerWrite(); + while (maxMessagesPerWrite > 0) { Object msg = in.current(); if (msg == null) { // Wrote all messages. - clearFlag(Native.EPOLLOUT); break; } @@ -301,7 +301,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements // We only handle UDP_SEGMENT in sendmmsg. in.current() instanceof SegmentedDatagramPacket) { NativeDatagramPacketArray array = cleanDatagramPacketArray(); - array.add(in, isConnected()); + array.add(in, isConnected(), maxMessagesPerWrite); int cnt = array.count(); if (cnt >= 1) { @@ -309,19 +309,15 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements int offset = 0; NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets(); - while (cnt > 0) { - int send = socket.sendmmsg(packets, offset, cnt); - if (send == 0) { - // Did not write all messages. - setFlag(Native.EPOLLOUT); - return; - } - for (int i = 0; i < send; i++) { - in.remove(); - } - cnt -= send; - offset += send; + int send = socket.sendmmsg(packets, offset, cnt); + if (send == 0) { + // Did not write all messages. + break; } + for (int i = 0; i < send; i++) { + in.remove(); + } + maxMessagesPerWrite -= send; continue; } } @@ -335,18 +331,26 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements if (done) { in.remove(); + maxMessagesPerWrite --; } else { - // Did not write all messages. - setFlag(Native.EPOLLOUT); break; } } catch (IOException e) { + maxMessagesPerWrite --; // Continue on write error as a DatagramChannel can write to multiple remote peers // // See https://github.com/netty/netty/issues/2665 in.remove(e); } } + + if (in.isEmpty()) { + // Did write all messages. + clearFlag(Native.EPOLLOUT); + } else { + // Did not write all messages. + setFlag(Native.EPOLLOUT); + } } private boolean doWriteMessage(Object msg) throws Exception { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java index 6fc0defcd2..bca61f3a45 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java @@ -17,6 +17,7 @@ package io.netty.channel.epoll; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; import io.netty.channel.FixedRecvByteBufAllocator; @@ -526,4 +527,10 @@ public final class EpollDatagramChannelConfig extends EpollChannelConfig impleme public int getMaxDatagramPayloadSize() { return maxDatagramSize; } + + @Override + public EpollDatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) { + super.setMaxMessagesPerWrite(maxMessagesPerWrite); + return this; + } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java index 60e4de8bae..6e3e7b1d7f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/NativeDatagramPacketArray.java @@ -79,8 +79,9 @@ final class NativeDatagramPacketArray { return true; } - void add(ChannelOutboundBuffer buffer, boolean connected) throws Exception { + void add(ChannelOutboundBuffer buffer, boolean connected, int maxMessagesPerWrite) throws Exception { processor.connected = connected; + processor.maxMessagesPerWrite = maxMessagesPerWrite; buffer.forEachFlushedMessage(processor); } @@ -109,9 +110,11 @@ final class NativeDatagramPacketArray { private final class MyMessageProcessor implements MessageProcessor { private boolean connected; + private int maxMessagesPerWrite; @Override public boolean processMessage(Object msg) { + final boolean added; if (msg instanceof DatagramPacket) { DatagramPacket packet = (DatagramPacket) msg; ByteBuf buf = packet.content(); @@ -124,11 +127,16 @@ final class NativeDatagramPacketArray { segmentSize = seg; } } - return add0(buf, buf.readerIndex(), buf.readableBytes(), segmentSize, packet.recipient()); - } - if (msg instanceof ByteBuf && connected) { + added = add0(buf, buf.readerIndex(), buf.readableBytes(), segmentSize, packet.recipient()); + } else if (msg instanceof ByteBuf && connected) { ByteBuf buf = (ByteBuf) msg; - return add0(buf, buf.readerIndex(), buf.readableBytes(), 0, null); + added = add0(buf, buf.readerIndex(), buf.readableBytes(), 0, null); + } else { + added = false; + } + if (added) { + maxMessagesPerWrite--; + return maxMessagesPerWrite > 0; } return false; } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java index 7aa171f31b..a19401dd8a 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java @@ -245,11 +245,10 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { - for (;;) { + int maxMessagesPerWrite = maxMessagesPerWrite(); + while (maxMessagesPerWrite > 0) { Object msg = in.current(); if (msg == null) { - // Wrote all messages. - writeFilter(false); break; } @@ -264,18 +263,22 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement if (done) { in.remove(); + maxMessagesPerWrite --; } else { - // Did not write all messages. - writeFilter(true); - break; + break; } } catch (IOException e) { + maxMessagesPerWrite --; + // Continue on write error as a DatagramChannel can write to multiple remote peers // // See https://github.com/netty/netty/issues/2665 in.remove(e); } } + + // Whether all messages were written or not. + writeFilter(in.isEmpty()); } private boolean doWriteMessage(Object msg) throws Exception { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannelConfig.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannelConfig.java index 73810132b7..ee6b070e66 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannelConfig.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannelConfig.java @@ -382,4 +382,10 @@ public final class KQueueDatagramChannelConfig extends KQueueChannelConfig imple public KQueueDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) { throw new UnsupportedOperationException("Multicast not supported"); } + + @Override + public KQueueDatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) { + super.setMaxMessagesPerWrite(maxMessagesPerWrite); + return this; + } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index ae5d0adb02..6589cd5bd5 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -88,6 +88,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha pipeline = newChannelPipeline(); } + protected final int maxMessagesPerWrite() { + ChannelConfig config = config(); + if (config instanceof DefaultChannelConfig) { + return ((DefaultChannelConfig) config).getMaxMessagesPerWrite(); + } + Integer value = config.getOption(ChannelOption.MAX_MESSAGES_PER_WRITE); + if (value == null) { + return Integer.MAX_VALUE; + } + return value; + } + @Override public final ChannelId id() { return id; diff --git a/transport/src/main/java/io/netty/channel/ChannelOption.java b/transport/src/main/java/io/netty/channel/ChannelOption.java index d75ed02393..c08cd28f91 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOption.java +++ b/transport/src/main/java/io/netty/channel/ChannelOption.java @@ -86,6 +86,8 @@ public class ChannelOption extends AbstractConstant> { */ @Deprecated public static final ChannelOption MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ"); + public static final ChannelOption MAX_MESSAGES_PER_WRITE = valueOf("MAX_MESSAGES_PER_WRITE"); + public static final ChannelOption WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT"); /** * @deprecated Use {@link #WRITE_BUFFER_WATER_MARK} diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index 09e5e0104b..e303a668e1 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -29,6 +29,7 @@ import static io.netty.channel.ChannelOption.AUTO_CLOSE; import static io.netty.channel.ChannelOption.AUTO_READ; import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ; +import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_WRITE; import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR; import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR; import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP; @@ -62,6 +63,8 @@ public class DefaultChannelConfig implements ChannelConfig { private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT; private volatile int writeSpinCount = 16; + private volatile int maxMessagesPerWrite = Integer.MAX_VALUE; + @SuppressWarnings("FieldMayBeFinal") private volatile int autoRead = 1; private volatile boolean autoClose = true; @@ -85,7 +88,7 @@ public class DefaultChannelConfig implements ChannelConfig { CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT, ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK, WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR, - SINGLE_EVENTEXECUTOR_PER_GROUP); + SINGLE_EVENTEXECUTOR_PER_GROUP, MAX_MESSAGES_PER_WRITE); } protected Map, Object> getOptions( @@ -155,6 +158,9 @@ public class DefaultChannelConfig implements ChannelConfig { if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { return (T) Boolean.valueOf(getPinEventExecutorPerGroup()); } + if (option == MAX_MESSAGES_PER_WRITE) { + return (T) Integer.valueOf(getMaxMessagesPerWrite()); + } return null; } @@ -187,6 +193,8 @@ public class DefaultChannelConfig implements ChannelConfig { setMessageSizeEstimator((MessageSizeEstimator) value); } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { setPinEventExecutorPerGroup((Boolean) value); + } else if (option == MAX_MESSAGES_PER_WRITE) { + setMaxMessagesPerWrite((Integer) value); } else { return false; } @@ -247,6 +255,23 @@ public class DefaultChannelConfig implements ChannelConfig { } } + /** + * Get the maximum number of message to write per eventloop run. Once this limit is + * reached we will continue to process other events before trying to write the remaining messages. + */ + public int getMaxMessagesPerWrite() { + return maxMessagesPerWrite; + } + + /** + * Set the maximum number of message to write per eventloop run. Once this limit is + * reached we will continue to process other events before trying to write the remaining messages. + */ + public ChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) { + this.maxMessagesPerWrite = ObjectUtil.checkPositive(maxMessagesPerWrite, "maxMessagesPerWrite"); + return this; + } + @Override public int getWriteSpinCount() { return writeSpinCount; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index b49d5f9e03..15d32460af 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -127,13 +127,10 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); - for (;;) { + int maxMessagesPerWrite = maxMessagesPerWrite(); + while (maxMessagesPerWrite > 0) { Object msg = in.current(); if (msg == null) { - // Wrote all messages. - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - key.interestOps(interestOps & ~SelectionKey.OP_WRITE); - } break; } try { @@ -146,22 +143,31 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { } if (done) { + maxMessagesPerWrite--; in.remove(); } else { - // Did not write all messages. - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - key.interestOps(interestOps | SelectionKey.OP_WRITE); - } break; } } catch (Exception e) { if (continueOnWriteError()) { + maxMessagesPerWrite--; in.remove(e); } else { throw e; } } } + if (in.isEmpty()) { + // Wrote all messages. + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + key.interestOps(interestOps & ~SelectionKey.OP_WRITE); + } + } else { + // Did not write all messages. + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + key.interestOps(interestOps | SelectionKey.OP_WRITE); + } + } } /** diff --git a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java index 1a267c1e19..32a8049501 100644 --- a/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/DefaultDatagramChannelConfig.java @@ -426,4 +426,10 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement super.setMessageSizeEstimator(estimator); return this; } + + @Override + public DatagramChannelConfig setMaxMessagesPerWrite(int maxMessagesPerWrite) { + super.setMaxMessagesPerWrite(maxMessagesPerWrite); + return this; + } }