From d728a72e745d840da397e287ab9c59f810cdadb4 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 21 Nov 2018 06:42:40 +0100 Subject: [PATCH] Combine flushes in DnsNameResolver to allow usage of sendmmsg to reduce syscall costs (#8470) Motivation: Some of transports support gathering writes when using datagrams. For example this is the case for EpollDatagramChannel. We should minimize the calls to flush() to allow making efficient usage of sendmmsg in this case. Modifications: - minimize flush() operations when we query for multiple address types. - reduce GC by always directly schedule doResolveAll0(...) on the EventLoop. Result: Be able to use sendmmsg internally in the DnsNameResolver. --- .../netty/resolver/dns/DnsNameResolver.java | 48 ++++++++++---- .../netty/resolver/dns/DnsQueryContext.java | 18 +++-- .../netty/resolver/dns/DnsResolveContext.java | 66 +++++++++++-------- 3 files changed, 83 insertions(+), 49 deletions(-) diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java index 5ab6c5285e..c8384fffe9 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java @@ -47,6 +47,7 @@ import io.netty.resolver.InetNameResolver; import io.netty.resolver.ResolvedAddressTypes; import io.netty.util.NetUtil; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -962,10 +963,32 @@ public class DnsNameResolver extends InetNameResolver { } } - private void doResolveAllUncached(String hostname, + private void doResolveAllUncached(final String hostname, + final DnsRecord[] additionals, + final Promise> promise, + final DnsCache resolveCache) { + // Call doResolveUncached0(...) in the EventLoop as we may need to submit multiple queries which would need + // to submit multiple Runnable at the end if we are not already on the EventLoop. + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + doResolveAllUncached0(hostname, additionals, promise, resolveCache); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + doResolveAllUncached0(hostname, additionals, promise, resolveCache); + } + }); + } + } + + private void doResolveAllUncached0(String hostname, DnsRecord[] additionals, Promise> promise, DnsCache resolveCache) { + + assert executor().inEventLoop(); + final DnsServerAddressStream nameServerAddrs = dnsServerAddressStreamProvider.nameServerAddressStream(hostname); new DnsAddressResolveContext(this, hostname, additionals, nameServerAddrs, @@ -1014,8 +1037,8 @@ public class DnsNameResolver extends InetNameResolver { public Future> query( InetSocketAddress nameServerAddr, DnsQuestion question) { - return query0(nameServerAddr, question, EMPTY_ADDITIONALS, - ch.eventLoop().>newPromise()); + return query0(nameServerAddr, question, EMPTY_ADDITIONALS, true, ch.newPromise(), + ch.eventLoop().>newPromise()); } /** @@ -1024,8 +1047,8 @@ public class DnsNameResolver extends InetNameResolver { public Future> query( InetSocketAddress nameServerAddr, DnsQuestion question, Iterable additionals) { - return query0(nameServerAddr, question, toArray(additionals, false), - ch.eventLoop().>newPromise()); + return query0(nameServerAddr, question, toArray(additionals, false), true, ch.newPromise(), + ch.eventLoop().>newPromise()); } /** @@ -1035,7 +1058,7 @@ public class DnsNameResolver extends InetNameResolver { InetSocketAddress nameServerAddr, DnsQuestion question, Promise> promise) { - return query0(nameServerAddr, question, EMPTY_ADDITIONALS, promise); + return query0(nameServerAddr, question, EMPTY_ADDITIONALS, true, ch.newPromise(), promise); } /** @@ -1046,7 +1069,7 @@ public class DnsNameResolver extends InetNameResolver { Iterable additionals, Promise> promise) { - return query0(nameServerAddr, question, toArray(additionals, false), promise); + return query0(nameServerAddr, question, toArray(additionals, false), true, ch.newPromise(), promise); } /** @@ -1067,16 +1090,14 @@ public class DnsNameResolver extends InetNameResolver { return cause != null && cause.getCause() instanceof DnsNameResolverTimeoutException; } - final Future> query0( - InetSocketAddress nameServerAddr, DnsQuestion question, - DnsRecord[] additionals, - Promise> promise) { - return query0(nameServerAddr, question, additionals, ch.newPromise(), promise); + final void flushQueries() { + ch.flush(); } final Future> query0( InetSocketAddress nameServerAddr, DnsQuestion question, DnsRecord[] additionals, + boolean flush, ChannelPromise writePromise, Promise> promise) { assert !writePromise.isVoid(); @@ -1084,7 +1105,8 @@ public class DnsNameResolver extends InetNameResolver { final Promise> castPromise = cast( checkNotNull(promise, "promise")); try { - new DnsQueryContext(this, nameServerAddr, question, additionals, castPromise).query(writePromise); + new DnsQueryContext(this, nameServerAddr, question, additionals, castPromise) + .query(flush, writePromise); return castPromise; } catch (Exception e) { return castPromise.setFailure(e); diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java index 1805b85c4b..08bbcb6088 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java @@ -89,7 +89,7 @@ final class DnsQueryContext implements FutureListener>() { @Override public void operationComplete(Future future) { if (future.isSuccess()) { - writeQuery(query, writePromise); + // If the query is done in a late fashion (as the channel was not ready yet) we always flush + // to ensure we did not race with a previous flush() that was done when the Channel was not + // ready yet. + writeQuery(query, true, writePromise); } else { Throwable cause = future.cause(); promise.tryFailure(cause); @@ -132,8 +135,9 @@ final class DnsQueryContext implements FutureListener { name = mapping; } - DnsServerAddressStream nameServerAddressStream = getNameServers(name); + try { + DnsServerAddressStream nameServerAddressStream = getNameServers(name); - final int end = expectedTypes.length - 1; - for (int i = 0; i < end; ++i) { - if (!query(name, expectedTypes[i], nameServerAddressStream.duplicate(), promise)) { - return; + final int end = expectedTypes.length - 1; + for (int i = 0; i < end; ++i) { + if (!query(name, expectedTypes[i], nameServerAddressStream.duplicate(), false, promise)) { + return; + } } + query(name, expectedTypes[end], nameServerAddressStream, false, promise); + } finally { + // Now flush everything we submitted before. + parent.flushQueries(); } - query(name, expectedTypes[end], nameServerAddressStream, promise); } /** @@ -316,17 +321,11 @@ abstract class DnsResolveContext { } } - private void query(final DnsServerAddressStream nameServerAddrStream, final int nameServerAddrStreamIndex, - final DnsQuestion question, - final Promise> promise, Throwable cause) { - query(nameServerAddrStream, nameServerAddrStreamIndex, question, - parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question), promise, cause); - } - private void query(final DnsServerAddressStream nameServerAddrStream, final int nameServerAddrStreamIndex, final DnsQuestion question, final DnsQueryLifecycleObserver queryLifecycleObserver, + final boolean flush, final Promise> promise, final Throwable cause) { if (nameServerAddrStreamIndex >= nameServerAddrStream.size() || allowedQueries == 0 || promise.isCancelled()) { @@ -344,9 +343,12 @@ abstract class DnsResolveContext { return; } final ChannelPromise writePromise = parent.ch.newPromise(); - final Future> f = parent.query0( - nameServerAddr, question, additionals, writePromise, - parent.ch.eventLoop().>newPromise()); + final Promise> queryPromise = + parent.ch.eventLoop().newPromise(); + + final Future> f = + parent.query0(nameServerAddr, question, additionals, flush, writePromise, queryPromise); + queriesInProgress.add(f); queryLifecycleObserver.queryWritten(nameServerAddr, writePromise); @@ -376,7 +378,8 @@ abstract class DnsResolveContext { } else { // Server did not respond or I/O error occurred; try again. queryLifecycleObserver.queryFailed(queryCause); - query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, promise, queryCause); + query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, + newDnsQueryLifecycleObserver(question), true, promise, queryCause); } } finally { tryToFinishResolve(nameServerAddrStream, nameServerAddrStreamIndex, question, @@ -417,11 +420,11 @@ abstract class DnsResolveContext { DnsServerAddressStream addressStream = new CombinedDnsServerAddressStream( nameServerAddr, resolvedAddresses, nameServerAddrStream); query(addressStream, nameServerAddrStreamIndex, question, - queryLifecycleObserver, promise, cause); + queryLifecycleObserver, true, promise, cause); } else { // Ignore the server and try the next one... query(nameServerAddrStream, nameServerAddrStreamIndex + 1, - question, queryLifecycleObserver, promise, cause); + question, queryLifecycleObserver, true, promise, cause); } } }); @@ -490,7 +493,7 @@ abstract class DnsResolveContext { // Retry with the next server if the server did not tell us that the domain does not exist. if (code != DnsResponseCode.NXDOMAIN) { query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, - queryLifecycleObserver.queryNoAnswer(code), promise, null); + queryLifecycleObserver.queryNoAnswer(code), true, promise, null); } else { queryLifecycleObserver.queryFailed(NXDOMAIN_QUERY_FAILED_EXCEPTION); } @@ -539,7 +542,7 @@ abstract class DnsResolveContext { if (serverStream != null) { query(serverStream, 0, question, queryLifecycleObserver.queryRedirected(new DnsAddressStreamList(serverStream)), - promise, null); + true, promise, null); return true; } } @@ -687,8 +690,7 @@ abstract class DnsResolveContext { } else { queryLifecycleObserver.querySucceed(); // We also got a CNAME so we need to ensure we also query it. - onResponseCNAME(question, cnames, - parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question), promise); + onResponseCNAME(question, cnames, newDnsQueryLifecycleObserver(question), promise); } } @@ -776,10 +778,11 @@ abstract class DnsResolveContext { if (queryLifecycleObserver == NoopDnsQueryLifecycleObserver.INSTANCE) { // If the queryLifecycleObserver has already been terminated we should create a new one for this // fresh query. - query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, promise, cause); + query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, + newDnsQueryLifecycleObserver(question), true, promise, cause); } else { query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, queryLifecycleObserver, - promise, cause); + true, promise, cause); } return; } @@ -795,7 +798,7 @@ abstract class DnsResolveContext { // As the last resort, try to query CNAME, just in case the name server has it. triedCNAME = true; - query(hostname, DnsRecordType.CNAME, getNameServers(hostname), promise); + query(hostname, DnsRecordType.CNAME, getNameServers(hostname), true, promise); return; } } else { @@ -891,11 +894,12 @@ abstract class DnsResolveContext { PlatformDependent.throwException(cause); return; } - query(stream, 0, cnameQuestion, queryLifecycleObserver.queryCNAMEd(cnameQuestion), promise, null); + query(stream, 0, cnameQuestion, queryLifecycleObserver.queryCNAMEd(cnameQuestion), + true, promise, null); } private boolean query(String hostname, DnsRecordType type, DnsServerAddressStream dnsServerAddressStream, - Promise> promise) { + boolean flush, Promise> promise) { final DnsQuestion question; try { question = new DefaultDnsQuestion(hostname, type, dnsClass); @@ -906,10 +910,14 @@ abstract class DnsResolveContext { type + ']', cause)); return false; } - query(dnsServerAddressStream, 0, question, promise, null); + query(dnsServerAddressStream, 0, question, newDnsQueryLifecycleObserver(question), flush, promise, null); return true; } + private DnsQueryLifecycleObserver newDnsQueryLifecycleObserver(DnsQuestion question) { + return parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question); + } + private final class CombinedDnsServerAddressStream implements DnsServerAddressStream { private final InetSocketAddress replaced; private final DnsServerAddressStream originalStream;