Allow domain sockets to configure SO_SNDBUF and SO_RCVBUF (#9584)

Motivation:

Running tests with a `KQueueDomainSocketChannel` showed worse performance than an `NioSocketChannel`. It turns out that the default send buffer size for Nio sockets is 64k while for KQueue sockets it's 8k. I verified that manually setting the socket's send buffer size improved perf to expected levels.

Modification:

Plumb the `SO_SNDBUF` and `SO_RCVBUF` options into the `*DomainSocketChannelConfig`.

Result:

Can now configure send and receive buffer sizes for domain sockets.
This commit is contained in:
Joe Ellis 2019-09-20 16:28:53 -04:00 committed by Norman Maurer
parent 07fe1a299a
commit aebe2064d5
2 changed files with 98 additions and 3 deletions

View File

@ -24,9 +24,12 @@ import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.unix.DomainSocketChannelConfig; import io.netty.channel.unix.DomainSocketChannelConfig;
import io.netty.channel.unix.DomainSocketReadMode; import io.netty.channel.unix.DomainSocketReadMode;
import java.io.IOException;
import java.util.Map; import java.util.Map;
import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE; import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE;
import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_SNDBUF;
import static io.netty.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE; import static io.netty.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE;
public final class EpollDomainSocketChannelConfig extends EpollChannelConfig public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
@ -40,7 +43,7 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
@Override @Override
public Map<ChannelOption<?>, Object> getOptions() { public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE); return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE, SO_SNDBUF, SO_RCVBUF);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -52,6 +55,12 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
if (option == ALLOW_HALF_CLOSURE) { if (option == ALLOW_HALF_CLOSURE) {
return (T) Boolean.valueOf(isAllowHalfClosure()); return (T) Boolean.valueOf(isAllowHalfClosure());
} }
if (option == SO_SNDBUF) {
return (T) Integer.valueOf(getSendBufferSize());
}
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
return super.getOption(option); return super.getOption(option);
} }
@ -63,6 +72,10 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
setReadMode((DomainSocketReadMode) value); setReadMode((DomainSocketReadMode) value);
} else if (option == ALLOW_HALF_CLOSURE) { } else if (option == ALLOW_HALF_CLOSURE) {
setAllowHalfClosure((Boolean) value); setAllowHalfClosure((Boolean) value);
} else if (option == SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else { } else {
return super.setOption(option, value); return super.setOption(option, value);
} }
@ -173,4 +186,38 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
this.allowHalfClosure = allowHalfClosure; this.allowHalfClosure = allowHalfClosure;
return this; return this;
} }
public int getSendBufferSize() {
try {
return ((EpollDomainSocketChannel) channel).socket.getSendBufferSize();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public EpollDomainSocketChannelConfig setSendBufferSize(int sendBufferSize) {
try {
((EpollDomainSocketChannel) channel).socket.setSendBufferSize(sendBufferSize);
return this;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public int getReceiveBufferSize() {
try {
return ((EpollDomainSocketChannel) channel).socket.getReceiveBufferSize();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public EpollDomainSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
try {
((EpollDomainSocketChannel) channel).socket.setReceiveBufferSize(receiveBufferSize);
return this;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
} }

View File

@ -25,9 +25,13 @@ import io.netty.channel.unix.DomainSocketChannelConfig;
import io.netty.channel.unix.DomainSocketReadMode; import io.netty.channel.unix.DomainSocketReadMode;
import io.netty.util.internal.UnstableApi; import io.netty.util.internal.UnstableApi;
import java.io.IOException;
import java.util.Map; import java.util.Map;
import static io.netty.channel.unix.UnixChannelOption.*; import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE;
import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_SNDBUF;
import static io.netty.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE;
@UnstableApi @UnstableApi
public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig implements DomainSocketChannelConfig { public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig implements DomainSocketChannelConfig {
@ -40,7 +44,7 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i
@Override @Override
public Map<ChannelOption<?>, Object> getOptions() { public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE); return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE, SO_SNDBUF, SO_RCVBUF);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -52,6 +56,12 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i
if (option == ALLOW_HALF_CLOSURE) { if (option == ALLOW_HALF_CLOSURE) {
return (T) Boolean.valueOf(isAllowHalfClosure()); return (T) Boolean.valueOf(isAllowHalfClosure());
} }
if (option == SO_SNDBUF) {
return (T) Integer.valueOf(getSendBufferSize());
}
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());
}
return super.getOption(option); return super.getOption(option);
} }
@ -63,6 +73,10 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i
setReadMode((DomainSocketReadMode) value); setReadMode((DomainSocketReadMode) value);
} else if (option == ALLOW_HALF_CLOSURE) { } else if (option == ALLOW_HALF_CLOSURE) {
setAllowHalfClosure((Boolean) value); setAllowHalfClosure((Boolean) value);
} else if (option == SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else { } else {
return super.setOption(option, value); return super.setOption(option, value);
} }
@ -159,6 +173,40 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i
return mode; return mode;
} }
public int getSendBufferSize() {
try {
return ((KQueueDomainSocketChannel) channel).socket.getSendBufferSize();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public KQueueDomainSocketChannelConfig setSendBufferSize(int sendBufferSize) {
try {
((KQueueDomainSocketChannel) channel).socket.setSendBufferSize(sendBufferSize);
return this;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public int getReceiveBufferSize() {
try {
return ((KQueueDomainSocketChannel) channel).socket.getReceiveBufferSize();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public KQueueDomainSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
try {
((KQueueDomainSocketChannel) channel).socket.setReceiveBufferSize(receiveBufferSize);
return this;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/** /**
* @see SocketChannelConfig#isAllowHalfClosure() * @see SocketChannelConfig#isAllowHalfClosure()
*/ */