Allow domain sockets to configure SO_SNDBUF and SO_RCVBUF (#9584)
Motivation: Running tests with a `KQueueDomainSocketChannel` showed worse performance than an `NioSocketChannel`. It turns out that the default send buffer size for Nio sockets is 64k while for KQueue sockets it's 8k. I verified that manually setting the socket's send buffer size improved perf to expected levels. Modification: Plumb the `SO_SNDBUF` and `SO_RCVBUF` options into the `*DomainSocketChannelConfig`. Result: Can now configure send and receive buffer sizes for domain sockets.
This commit is contained in:
parent
07fe1a299a
commit
aebe2064d5
@ -24,9 +24,12 @@ import io.netty.channel.socket.SocketChannelConfig;
|
||||
import io.netty.channel.unix.DomainSocketChannelConfig;
|
||||
import io.netty.channel.unix.DomainSocketReadMode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE;
|
||||
import static io.netty.channel.ChannelOption.SO_RCVBUF;
|
||||
import static io.netty.channel.ChannelOption.SO_SNDBUF;
|
||||
import static io.netty.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE;
|
||||
|
||||
public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
|
||||
@ -40,7 +43,7 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
|
||||
|
||||
@Override
|
||||
public Map<ChannelOption<?>, Object> getOptions() {
|
||||
return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE);
|
||||
return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE, SO_SNDBUF, SO_RCVBUF);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -52,6 +55,12 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
|
||||
if (option == ALLOW_HALF_CLOSURE) {
|
||||
return (T) Boolean.valueOf(isAllowHalfClosure());
|
||||
}
|
||||
if (option == SO_SNDBUF) {
|
||||
return (T) Integer.valueOf(getSendBufferSize());
|
||||
}
|
||||
if (option == SO_RCVBUF) {
|
||||
return (T) Integer.valueOf(getReceiveBufferSize());
|
||||
}
|
||||
return super.getOption(option);
|
||||
}
|
||||
|
||||
@ -63,6 +72,10 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
|
||||
setReadMode((DomainSocketReadMode) value);
|
||||
} else if (option == ALLOW_HALF_CLOSURE) {
|
||||
setAllowHalfClosure((Boolean) value);
|
||||
} else if (option == SO_SNDBUF) {
|
||||
setSendBufferSize((Integer) value);
|
||||
} else if (option == SO_RCVBUF) {
|
||||
setReceiveBufferSize((Integer) value);
|
||||
} else {
|
||||
return super.setOption(option, value);
|
||||
}
|
||||
@ -173,4 +186,38 @@ public final class EpollDomainSocketChannelConfig extends EpollChannelConfig
|
||||
this.allowHalfClosure = allowHalfClosure;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getSendBufferSize() {
|
||||
try {
|
||||
return ((EpollDomainSocketChannel) channel).socket.getSendBufferSize();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public EpollDomainSocketChannelConfig setSendBufferSize(int sendBufferSize) {
|
||||
try {
|
||||
((EpollDomainSocketChannel) channel).socket.setSendBufferSize(sendBufferSize);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public int getReceiveBufferSize() {
|
||||
try {
|
||||
return ((EpollDomainSocketChannel) channel).socket.getReceiveBufferSize();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public EpollDomainSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
|
||||
try {
|
||||
((EpollDomainSocketChannel) channel).socket.setReceiveBufferSize(receiveBufferSize);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -25,9 +25,13 @@ import io.netty.channel.unix.DomainSocketChannelConfig;
|
||||
import io.netty.channel.unix.DomainSocketReadMode;
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import static io.netty.channel.unix.UnixChannelOption.*;
|
||||
import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE;
|
||||
import static io.netty.channel.ChannelOption.SO_RCVBUF;
|
||||
import static io.netty.channel.ChannelOption.SO_SNDBUF;
|
||||
import static io.netty.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE;
|
||||
|
||||
@UnstableApi
|
||||
public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig implements DomainSocketChannelConfig {
|
||||
@ -40,7 +44,7 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i
|
||||
|
||||
@Override
|
||||
public Map<ChannelOption<?>, Object> getOptions() {
|
||||
return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE);
|
||||
return getOptions(super.getOptions(), DOMAIN_SOCKET_READ_MODE, ALLOW_HALF_CLOSURE, SO_SNDBUF, SO_RCVBUF);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -52,6 +56,12 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i
|
||||
if (option == ALLOW_HALF_CLOSURE) {
|
||||
return (T) Boolean.valueOf(isAllowHalfClosure());
|
||||
}
|
||||
if (option == SO_SNDBUF) {
|
||||
return (T) Integer.valueOf(getSendBufferSize());
|
||||
}
|
||||
if (option == SO_RCVBUF) {
|
||||
return (T) Integer.valueOf(getReceiveBufferSize());
|
||||
}
|
||||
return super.getOption(option);
|
||||
}
|
||||
|
||||
@ -63,6 +73,10 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i
|
||||
setReadMode((DomainSocketReadMode) value);
|
||||
} else if (option == ALLOW_HALF_CLOSURE) {
|
||||
setAllowHalfClosure((Boolean) value);
|
||||
} else if (option == SO_SNDBUF) {
|
||||
setSendBufferSize((Integer) value);
|
||||
} else if (option == SO_RCVBUF) {
|
||||
setReceiveBufferSize((Integer) value);
|
||||
} else {
|
||||
return super.setOption(option, value);
|
||||
}
|
||||
@ -159,6 +173,40 @@ public final class KQueueDomainSocketChannelConfig extends KQueueChannelConfig i
|
||||
return mode;
|
||||
}
|
||||
|
||||
public int getSendBufferSize() {
|
||||
try {
|
||||
return ((KQueueDomainSocketChannel) channel).socket.getSendBufferSize();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public KQueueDomainSocketChannelConfig setSendBufferSize(int sendBufferSize) {
|
||||
try {
|
||||
((KQueueDomainSocketChannel) channel).socket.setSendBufferSize(sendBufferSize);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public int getReceiveBufferSize() {
|
||||
try {
|
||||
return ((KQueueDomainSocketChannel) channel).socket.getReceiveBufferSize();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public KQueueDomainSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
|
||||
try {
|
||||
((KQueueDomainSocketChannel) channel).socket.setReceiveBufferSize(receiveBufferSize);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @see SocketChannelConfig#isAllowHalfClosure()
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user