From bf52f39e598aa28d0813754e462bbcce5ff5d5c2 Mon Sep 17 00:00:00 2001 From: Joe Ellis Date: Fri, 20 Sep 2019 16:28:53 -0400 Subject: [PATCH] Allow domain sockets to configure SO_SNDBUF and SO_RCVBUF (#9584) Motivation: Running tests with a `KQueueDomainSocketChannel` showed worse performance than an `NioSocketChannel`. It turns out that the default send buffer size for Nio sockets is 64k while for KQueue sockets it's 8k. I verified that manually setting the socket's send buffer size improved perf to expected levels. Modification: Plumb the `SO_SNDBUF` and `SO_RCVBUF` options into the `*DomainSocketChannelConfig`. Result: Can now configure send and receive buffer sizes for domain sockets. --- .../epoll/EpollDomainSocketChannelConfig.java | 49 ++++++++++++++++- .../KQueueDomainSocketChannelConfig.java | 53 +++++++++++++++++-- 2 files changed, 98 insertions(+), 4 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java index 8f9d4713e5..5146dbe4f1 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannelConfig.java @@ -26,9 +26,12 @@ import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.unix.DomainSocketChannelConfig; import io.netty.channel.unix.DomainSocketReadMode; +import java.io.IOException; import java.util.Map; import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE; +import static io.netty.channel.ChannelOption.SO_RCVBUF; +import static io.netty.channel.ChannelOption.SO_SNDBUF; import static io.netty.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE; public final class EpollDomainSocketChannelConfig extends EpollChannelConfig @@ -42,7 +45,7 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig @Override public Map, Object> getOptions() { - return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE); + return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE, SO_SNDBUF, SO_RCVBUF); } @SuppressWarnings("unchecked") @@ -54,6 +57,12 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig if (option == ALLOW_HALF_CLOSURE) { return (T) Boolean.valueOf(isAllowHalfClosure()); } + if (option == SO_SNDBUF) { + return (T) Integer.valueOf(getSendBufferSize()); + } + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } return super.getOption(option); } @@ -65,6 +74,10 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig setReadMode((DomainSocketReadMode) value); } else if (option == ALLOW_HALF_CLOSURE) { setAllowHalfClosure((Boolean) value); + } else if (option == SO_SNDBUF) { + setSendBufferSize((Integer) value); + } else if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); } else { return super.setOption(option, value); } @@ -173,4 +186,38 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig this.allowHalfClosure = allowHalfClosure; return this; } + + public int getSendBufferSize() { + try { + return ((EpollDomainSocketChannel) channel).socket.getSendBufferSize(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public EpollDomainSocketChannelConfig setSendBufferSize(int sendBufferSize) { + try { + ((EpollDomainSocketChannel) channel).socket.setSendBufferSize(sendBufferSize); + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int getReceiveBufferSize() { + try { + return ((EpollDomainSocketChannel) channel).socket.getReceiveBufferSize(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public EpollDomainSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { + try { + ((EpollDomainSocketChannel) channel).socket.setReceiveBufferSize(receiveBufferSize); + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannelConfig.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannelConfig.java index 50a7d7b324..aa29a47aba 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannelConfig.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannelConfig.java @@ -25,11 +25,14 @@ import io.netty.channel.unix.DomainSocketChannelConfig; import io.netty.channel.unix.DomainSocketReadMode; import io.netty.util.internal.UnstableApi; +import java.io.IOException; import java.util.Map; -import static io.netty.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE; -import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE; import static java.util.Objects.requireNonNull; +import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE; +import static io.netty.channel.ChannelOption.SO_RCVBUF; +import static io.netty.channel.ChannelOption.SO_SNDBUF; +import static io.netty.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE; @UnstableApi public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig implements DomainSocketChannelConfig { @@ -42,7 +45,7 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i @Override public Map, Object> getOptions() { - return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE); + return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE, SO_SNDBUF, SO_RCVBUF); } @SuppressWarnings("unchecked") @@ -54,6 +57,12 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i if (option == ALLOW_HALF_CLOSURE) { return (T) Boolean.valueOf(isAllowHalfClosure()); } + if (option == SO_SNDBUF) { + return (T) Integer.valueOf(getSendBufferSize()); + } + if (option == SO_RCVBUF) { + return (T) Integer.valueOf(getReceiveBufferSize()); + } return super.getOption(option); } @@ -65,6 +74,10 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i setReadMode((DomainSocketReadMode) value); } else if (option == ALLOW_HALF_CLOSURE) { setAllowHalfClosure((Boolean) value); + } else if (option == SO_SNDBUF) { + setSendBufferSize((Integer) value); + } else if (option == SO_RCVBUF) { + setReceiveBufferSize((Integer) value); } else { return super.setOption(option, value); } @@ -159,6 +172,40 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i return mode; } + public int getSendBufferSize() { + try { + return ((KQueueDomainSocketChannel) channel).socket.getSendBufferSize(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public KQueueDomainSocketChannelConfig setSendBufferSize(int sendBufferSize) { + try { + ((KQueueDomainSocketChannel) channel).socket.setSendBufferSize(sendBufferSize); + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public int getReceiveBufferSize() { + try { + return ((KQueueDomainSocketChannel) channel).socket.getReceiveBufferSize(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public KQueueDomainSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { + try { + ((KQueueDomainSocketChannel) channel).socket.setReceiveBufferSize(receiveBufferSize); + return this; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** * @see SocketChannelConfig#isAllowHalfClosure() */