Combine flushes in DnsNameResolver to allow usage of sendmmsg to reduce syscall costs (#8470)

Motivation:

Some of transports support gathering writes when using datagrams. For example this is the case for EpollDatagramChannel. We should minimize the calls to flush() to allow making efficient usage of sendmmsg in this case.

Modifications:

- minimize flush() operations when we query for multiple address types.
- reduce GC by always directly schedule doResolveAll0(...) on the EventLoop.

Result:

Be able to use sendmmsg internally in the DnsNameResolver.
This commit is contained in:
Norman Maurer 2018-11-21 06:42:40 +01:00
parent 63b0f78e56
commit 99aa51b74a
3 changed files with 83 additions and 49 deletions

View File

@ -47,6 +47,7 @@ import io.netty.resolver.InetNameResolver;
import io.netty.resolver.ResolvedAddressTypes;
import io.netty.util.NetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
@ -962,10 +963,32 @@ public class DnsNameResolver extends InetNameResolver {
}
}
private void doResolveAllUncached(String hostname,
private void doResolveAllUncached(final String hostname,
final DnsRecord[] additionals,
final Promise<List<InetAddress>> promise,
final DnsCache resolveCache) {
// Call doResolveUncached0(...) in the EventLoop as we may need to submit multiple queries which would need
// to submit multiple Runnable at the end if we are not already on the EventLoop.
EventExecutor executor = executor();
if (executor.inEventLoop()) {
doResolveAllUncached0(hostname, additionals, promise, resolveCache);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
doResolveAllUncached0(hostname, additionals, promise, resolveCache);
}
});
}
}
private void doResolveAllUncached0(String hostname,
DnsRecord[] additionals,
Promise<List<InetAddress>> promise,
DnsCache resolveCache) {
assert executor().inEventLoop();
final DnsServerAddressStream nameServerAddrs =
dnsServerAddressStreamProvider.nameServerAddressStream(hostname);
new DnsAddressResolveContext(this, hostname, additionals, nameServerAddrs,
@ -1014,7 +1037,7 @@ public class DnsNameResolver extends InetNameResolver {
public Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> query(
InetSocketAddress nameServerAddr, DnsQuestion question) {
return query0(nameServerAddr, question, EMPTY_ADDITIONALS,
return query0(nameServerAddr, question, EMPTY_ADDITIONALS, true, ch.newPromise(),
ch.eventLoop().<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>>newPromise());
}
@ -1024,7 +1047,7 @@ public class DnsNameResolver extends InetNameResolver {
public Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> query(
InetSocketAddress nameServerAddr, DnsQuestion question, Iterable<DnsRecord> additionals) {
return query0(nameServerAddr, question, toArray(additionals, false),
return query0(nameServerAddr, question, toArray(additionals, false), true, ch.newPromise(),
ch.eventLoop().<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>>newPromise());
}
@ -1035,7 +1058,7 @@ public class DnsNameResolver extends InetNameResolver {
InetSocketAddress nameServerAddr, DnsQuestion question,
Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> promise) {
return query0(nameServerAddr, question, EMPTY_ADDITIONALS, promise);
return query0(nameServerAddr, question, EMPTY_ADDITIONALS, true, ch.newPromise(), promise);
}
/**
@ -1046,7 +1069,7 @@ public class DnsNameResolver extends InetNameResolver {
Iterable<DnsRecord> additionals,
Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> promise) {
return query0(nameServerAddr, question, toArray(additionals, false), promise);
return query0(nameServerAddr, question, toArray(additionals, false), true, ch.newPromise(), promise);
}
/**
@ -1067,16 +1090,14 @@ public class DnsNameResolver extends InetNameResolver {
return cause != null && cause.getCause() instanceof DnsNameResolverTimeoutException;
}
final Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> query0(
InetSocketAddress nameServerAddr, DnsQuestion question,
DnsRecord[] additionals,
Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> promise) {
return query0(nameServerAddr, question, additionals, ch.newPromise(), promise);
final void flushQueries() {
ch.flush();
}
final Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> query0(
InetSocketAddress nameServerAddr, DnsQuestion question,
DnsRecord[] additionals,
boolean flush,
ChannelPromise writePromise,
Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> promise) {
assert !writePromise.isVoid();
@ -1084,7 +1105,8 @@ public class DnsNameResolver extends InetNameResolver {
final Promise<AddressedEnvelope<DnsResponse, InetSocketAddress>> castPromise = cast(
checkNotNull(promise, "promise"));
try {
new DnsQueryContext(this, nameServerAddr, question, additionals, castPromise).query(writePromise);
new DnsQueryContext(this, nameServerAddr, question, additionals, castPromise)
.query(flush, writePromise);
return castPromise;
} catch (Exception e) {
return castPromise.setFailure(e);

View File

@ -89,7 +89,7 @@ final class DnsQueryContext implements FutureListener<AddressedEnvelope<DnsRespo
return question;
}
void query(ChannelPromise writePromise) {
void query(boolean flush, ChannelPromise writePromise) {
final DnsQuestion question = question();
final InetSocketAddress nameServerAddr = nameServerAddr();
final DatagramDnsQuery query = new DatagramDnsQuery(null, nameServerAddr, id);
@ -110,18 +110,21 @@ final class DnsQueryContext implements FutureListener<AddressedEnvelope<DnsRespo
logger.debug("{} WRITE: [{}: {}], {}", parent.ch, id, nameServerAddr, question);
}
sendQuery(query, writePromise);
sendQuery(query, flush, writePromise);
}
private void sendQuery(final DnsQuery query, final ChannelPromise writePromise) {
private void sendQuery(final DnsQuery query, final boolean flush, final ChannelPromise writePromise) {
if (parent.channelFuture.isDone()) {
writeQuery(query, writePromise);
writeQuery(query, flush, writePromise);
} else {
parent.channelFuture.addListener(new GenericFutureListener<Future<? super Channel>>() {
@Override
public void operationComplete(Future<? super Channel> future) {
if (future.isSuccess()) {
writeQuery(query, writePromise);
// If the query is done in a late fashion (as the channel was not ready yet) we always flush
// to ensure we did not race with a previous flush() that was done when the Channel was not
// ready yet.
writeQuery(query, true, writePromise);
} else {
Throwable cause = future.cause();
promise.tryFailure(cause);
@ -132,8 +135,9 @@ final class DnsQueryContext implements FutureListener<AddressedEnvelope<DnsRespo
}
}
private void writeQuery(final DnsQuery query, final ChannelPromise writePromise) {
final ChannelFuture writeFuture = parent.ch.writeAndFlush(query, writePromise);
private void writeQuery(final DnsQuery query, final boolean flush, final ChannelPromise writePromise) {
final ChannelFuture writeFuture = flush ? parent.ch.writeAndFlush(query, writePromise) :
parent.ch.write(query, writePromise);
if (writeFuture.isDone()) {
onQueryWriteCompletion(writeFuture);
} else {

View File

@ -261,15 +261,20 @@ abstract class DnsResolveContext<T> {
name = mapping;
}
try {
DnsServerAddressStream nameServerAddressStream = getNameServers(name);
final int end = expectedTypes.length - 1;
for (int i = 0; i < end; ++i) {
if (!query(name, expectedTypes[i], nameServerAddressStream.duplicate(), promise)) {
if (!query(name, expectedTypes[i], nameServerAddressStream.duplicate(), false, promise)) {
return;
}
}
query(name, expectedTypes[end], nameServerAddressStream, promise);
query(name, expectedTypes[end], nameServerAddressStream, false, promise);
} finally {
// Now flush everything we submitted before.
parent.flushQueries();
}
}
/**
@ -316,17 +321,11 @@ abstract class DnsResolveContext<T> {
}
}
private void query(final DnsServerAddressStream nameServerAddrStream, final int nameServerAddrStreamIndex,
final DnsQuestion question,
final Promise<List<T>> promise, Throwable cause) {
query(nameServerAddrStream, nameServerAddrStreamIndex, question,
parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question), promise, cause);
}
private void query(final DnsServerAddressStream nameServerAddrStream,
final int nameServerAddrStreamIndex,
final DnsQuestion question,
final DnsQueryLifecycleObserver queryLifecycleObserver,
final boolean flush,
final Promise<List<T>> promise,
final Throwable cause) {
if (nameServerAddrStreamIndex >= nameServerAddrStream.size() || allowedQueries == 0 || promise.isCancelled()) {
@ -344,9 +343,12 @@ abstract class DnsResolveContext<T> {
return;
}
final ChannelPromise writePromise = parent.ch.newPromise();
final Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> f = parent.query0(
nameServerAddr, question, additionals, writePromise,
parent.ch.eventLoop().<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>>newPromise());
final Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> queryPromise =
parent.ch.eventLoop().newPromise();
final Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> f =
parent.query0(nameServerAddr, question, additionals, flush, writePromise, queryPromise);
queriesInProgress.add(f);
queryLifecycleObserver.queryWritten(nameServerAddr, writePromise);
@ -376,7 +378,8 @@ abstract class DnsResolveContext<T> {
} else {
// Server did not respond or I/O error occurred; try again.
queryLifecycleObserver.queryFailed(queryCause);
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, promise, queryCause);
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question,
newDnsQueryLifecycleObserver(question), true, promise, queryCause);
}
} finally {
tryToFinishResolve(nameServerAddrStream, nameServerAddrStreamIndex, question,
@ -417,11 +420,11 @@ abstract class DnsResolveContext<T> {
DnsServerAddressStream addressStream = new CombinedDnsServerAddressStream(
nameServerAddr, resolvedAddresses, nameServerAddrStream);
query(addressStream, nameServerAddrStreamIndex, question,
queryLifecycleObserver, promise, cause);
queryLifecycleObserver, true, promise, cause);
} else {
// Ignore the server and try the next one...
query(nameServerAddrStream, nameServerAddrStreamIndex + 1,
question, queryLifecycleObserver, promise, cause);
question, queryLifecycleObserver, true, promise, cause);
}
}
});
@ -490,7 +493,7 @@ abstract class DnsResolveContext<T> {
// Retry with the next server if the server did not tell us that the domain does not exist.
if (code != DnsResponseCode.NXDOMAIN) {
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question,
queryLifecycleObserver.queryNoAnswer(code), promise, null);
queryLifecycleObserver.queryNoAnswer(code), true, promise, null);
} else {
queryLifecycleObserver.queryFailed(NXDOMAIN_QUERY_FAILED_EXCEPTION);
}
@ -539,7 +542,7 @@ abstract class DnsResolveContext<T> {
if (serverStream != null) {
query(serverStream, 0, question,
queryLifecycleObserver.queryRedirected(new DnsAddressStreamList(serverStream)),
promise, null);
true, promise, null);
return true;
}
}
@ -687,8 +690,7 @@ abstract class DnsResolveContext<T> {
} else {
queryLifecycleObserver.querySucceed();
// We also got a CNAME so we need to ensure we also query it.
onResponseCNAME(question, cnames,
parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question), promise);
onResponseCNAME(question, cnames, newDnsQueryLifecycleObserver(question), promise);
}
}
@ -776,10 +778,11 @@ abstract class DnsResolveContext<T> {
if (queryLifecycleObserver == NoopDnsQueryLifecycleObserver.INSTANCE) {
// If the queryLifecycleObserver has already been terminated we should create a new one for this
// fresh query.
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, promise, cause);
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question,
newDnsQueryLifecycleObserver(question), true, promise, cause);
} else {
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, queryLifecycleObserver,
promise, cause);
true, promise, cause);
}
return;
}
@ -795,7 +798,7 @@ abstract class DnsResolveContext<T> {
// As the last resort, try to query CNAME, just in case the name server has it.
triedCNAME = true;
query(hostname, DnsRecordType.CNAME, getNameServers(hostname), promise);
query(hostname, DnsRecordType.CNAME, getNameServers(hostname), true, promise);
return;
}
} else {
@ -891,11 +894,12 @@ abstract class DnsResolveContext<T> {
PlatformDependent.throwException(cause);
return;
}
query(stream, 0, cnameQuestion, queryLifecycleObserver.queryCNAMEd(cnameQuestion), promise, null);
query(stream, 0, cnameQuestion, queryLifecycleObserver.queryCNAMEd(cnameQuestion),
true, promise, null);
}
private boolean query(String hostname, DnsRecordType type, DnsServerAddressStream dnsServerAddressStream,
Promise<List<T>> promise) {
boolean flush, Promise<List<T>> promise) {
final DnsQuestion question;
try {
question = new DefaultDnsQuestion(hostname, type, dnsClass);
@ -906,10 +910,14 @@ abstract class DnsResolveContext<T> {
type + ']', cause));
return false;
}
query(dnsServerAddressStream, 0, question, promise, null);
query(dnsServerAddressStream, 0, question, newDnsQueryLifecycleObserver(question), flush, promise, null);
return true;
}
private DnsQueryLifecycleObserver newDnsQueryLifecycleObserver(DnsQuestion question) {
return parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question);
}
private final class CombinedDnsServerAddressStream implements DnsServerAddressStream {
private final InetSocketAddress replaced;
private final DnsServerAddressStream originalStream;