diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java index 5ab6c5285e..c8384fffe9 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsNameResolver.java @@ -47,6 +47,7 @@ import io.netty.resolver.InetNameResolver; import io.netty.resolver.ResolvedAddressTypes; import io.netty.util.NetUtil; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; @@ -962,10 +963,32 @@ public class DnsNameResolver extends InetNameResolver { } } - private void doResolveAllUncached(String hostname, + private void doResolveAllUncached(final String hostname, + final DnsRecord[] additionals, + final Promise> promise, + final DnsCache resolveCache) { + // Call doResolveUncached0(...) in the EventLoop as we may need to submit multiple queries which would need + // to submit multiple Runnable at the end if we are not already on the EventLoop. + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + doResolveAllUncached0(hostname, additionals, promise, resolveCache); + } else { + executor.execute(new Runnable() { + @Override + public void run() { + doResolveAllUncached0(hostname, additionals, promise, resolveCache); + } + }); + } + } + + private void doResolveAllUncached0(String hostname, DnsRecord[] additionals, Promise> promise, DnsCache resolveCache) { + + assert executor().inEventLoop(); + final DnsServerAddressStream nameServerAddrs = dnsServerAddressStreamProvider.nameServerAddressStream(hostname); new DnsAddressResolveContext(this, hostname, additionals, nameServerAddrs, @@ -1014,8 +1037,8 @@ public class DnsNameResolver extends InetNameResolver { public Future> query( InetSocketAddress nameServerAddr, DnsQuestion question) { - return query0(nameServerAddr, question, EMPTY_ADDITIONALS, - ch.eventLoop().>newPromise()); + return query0(nameServerAddr, question, EMPTY_ADDITIONALS, true, ch.newPromise(), + ch.eventLoop().>newPromise()); } /** @@ -1024,8 +1047,8 @@ public class DnsNameResolver extends InetNameResolver { public Future> query( InetSocketAddress nameServerAddr, DnsQuestion question, Iterable additionals) { - return query0(nameServerAddr, question, toArray(additionals, false), - ch.eventLoop().>newPromise()); + return query0(nameServerAddr, question, toArray(additionals, false), true, ch.newPromise(), + ch.eventLoop().>newPromise()); } /** @@ -1035,7 +1058,7 @@ public class DnsNameResolver extends InetNameResolver { InetSocketAddress nameServerAddr, DnsQuestion question, Promise> promise) { - return query0(nameServerAddr, question, EMPTY_ADDITIONALS, promise); + return query0(nameServerAddr, question, EMPTY_ADDITIONALS, true, ch.newPromise(), promise); } /** @@ -1046,7 +1069,7 @@ public class DnsNameResolver extends InetNameResolver { Iterable additionals, Promise> promise) { - return query0(nameServerAddr, question, toArray(additionals, false), promise); + return query0(nameServerAddr, question, toArray(additionals, false), true, ch.newPromise(), promise); } /** @@ -1067,16 +1090,14 @@ public class DnsNameResolver extends InetNameResolver { return cause != null && cause.getCause() instanceof DnsNameResolverTimeoutException; } - final Future> query0( - InetSocketAddress nameServerAddr, DnsQuestion question, - DnsRecord[] additionals, - Promise> promise) { - return query0(nameServerAddr, question, additionals, ch.newPromise(), promise); + final void flushQueries() { + ch.flush(); } final Future> query0( InetSocketAddress nameServerAddr, DnsQuestion question, DnsRecord[] additionals, + boolean flush, ChannelPromise writePromise, Promise> promise) { assert !writePromise.isVoid(); @@ -1084,7 +1105,8 @@ public class DnsNameResolver extends InetNameResolver { final Promise> castPromise = cast( checkNotNull(promise, "promise")); try { - new DnsQueryContext(this, nameServerAddr, question, additionals, castPromise).query(writePromise); + new DnsQueryContext(this, nameServerAddr, question, additionals, castPromise) + .query(flush, writePromise); return castPromise; } catch (Exception e) { return castPromise.setFailure(e); diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java index 1805b85c4b..08bbcb6088 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java @@ -89,7 +89,7 @@ final class DnsQueryContext implements FutureListener>() { @Override public void operationComplete(Future future) { if (future.isSuccess()) { - writeQuery(query, writePromise); + // If the query is done in a late fashion (as the channel was not ready yet) we always flush + // to ensure we did not race with a previous flush() that was done when the Channel was not + // ready yet. + writeQuery(query, true, writePromise); } else { Throwable cause = future.cause(); promise.tryFailure(cause); @@ -132,8 +135,9 @@ final class DnsQueryContext implements FutureListener { name = mapping; } - DnsServerAddressStream nameServerAddressStream = getNameServers(name); + try { + DnsServerAddressStream nameServerAddressStream = getNameServers(name); - final int end = expectedTypes.length - 1; - for (int i = 0; i < end; ++i) { - if (!query(name, expectedTypes[i], nameServerAddressStream.duplicate(), promise)) { - return; + final int end = expectedTypes.length - 1; + for (int i = 0; i < end; ++i) { + if (!query(name, expectedTypes[i], nameServerAddressStream.duplicate(), false, promise)) { + return; + } } + query(name, expectedTypes[end], nameServerAddressStream, false, promise); + } finally { + // Now flush everything we submitted before. + parent.flushQueries(); } - query(name, expectedTypes[end], nameServerAddressStream, promise); } /** @@ -316,17 +321,11 @@ abstract class DnsResolveContext { } } - private void query(final DnsServerAddressStream nameServerAddrStream, final int nameServerAddrStreamIndex, - final DnsQuestion question, - final Promise> promise, Throwable cause) { - query(nameServerAddrStream, nameServerAddrStreamIndex, question, - parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question), promise, cause); - } - private void query(final DnsServerAddressStream nameServerAddrStream, final int nameServerAddrStreamIndex, final DnsQuestion question, final DnsQueryLifecycleObserver queryLifecycleObserver, + final boolean flush, final Promise> promise, final Throwable cause) { if (nameServerAddrStreamIndex >= nameServerAddrStream.size() || allowedQueries == 0 || promise.isCancelled()) { @@ -344,9 +343,12 @@ abstract class DnsResolveContext { return; } final ChannelPromise writePromise = parent.ch.newPromise(); - final Future> f = parent.query0( - nameServerAddr, question, additionals, writePromise, - parent.ch.eventLoop().>newPromise()); + final Promise> queryPromise = + parent.ch.eventLoop().newPromise(); + + final Future> f = + parent.query0(nameServerAddr, question, additionals, flush, writePromise, queryPromise); + queriesInProgress.add(f); queryLifecycleObserver.queryWritten(nameServerAddr, writePromise); @@ -376,7 +378,8 @@ abstract class DnsResolveContext { } else { // Server did not respond or I/O error occurred; try again. queryLifecycleObserver.queryFailed(queryCause); - query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, promise, queryCause); + query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, + newDnsQueryLifecycleObserver(question), true, promise, queryCause); } } finally { tryToFinishResolve(nameServerAddrStream, nameServerAddrStreamIndex, question, @@ -417,11 +420,11 @@ abstract class DnsResolveContext { DnsServerAddressStream addressStream = new CombinedDnsServerAddressStream( nameServerAddr, resolvedAddresses, nameServerAddrStream); query(addressStream, nameServerAddrStreamIndex, question, - queryLifecycleObserver, promise, cause); + queryLifecycleObserver, true, promise, cause); } else { // Ignore the server and try the next one... query(nameServerAddrStream, nameServerAddrStreamIndex + 1, - question, queryLifecycleObserver, promise, cause); + question, queryLifecycleObserver, true, promise, cause); } } }); @@ -490,7 +493,7 @@ abstract class DnsResolveContext { // Retry with the next server if the server did not tell us that the domain does not exist. if (code != DnsResponseCode.NXDOMAIN) { query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, - queryLifecycleObserver.queryNoAnswer(code), promise, null); + queryLifecycleObserver.queryNoAnswer(code), true, promise, null); } else { queryLifecycleObserver.queryFailed(NXDOMAIN_QUERY_FAILED_EXCEPTION); } @@ -539,7 +542,7 @@ abstract class DnsResolveContext { if (serverStream != null) { query(serverStream, 0, question, queryLifecycleObserver.queryRedirected(new DnsAddressStreamList(serverStream)), - promise, null); + true, promise, null); return true; } } @@ -687,8 +690,7 @@ abstract class DnsResolveContext { } else { queryLifecycleObserver.querySucceed(); // We also got a CNAME so we need to ensure we also query it. - onResponseCNAME(question, cnames, - parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question), promise); + onResponseCNAME(question, cnames, newDnsQueryLifecycleObserver(question), promise); } } @@ -776,10 +778,11 @@ abstract class DnsResolveContext { if (queryLifecycleObserver == NoopDnsQueryLifecycleObserver.INSTANCE) { // If the queryLifecycleObserver has already been terminated we should create a new one for this // fresh query. - query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, promise, cause); + query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, + newDnsQueryLifecycleObserver(question), true, promise, cause); } else { query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, queryLifecycleObserver, - promise, cause); + true, promise, cause); } return; } @@ -795,7 +798,7 @@ abstract class DnsResolveContext { // As the last resort, try to query CNAME, just in case the name server has it. triedCNAME = true; - query(hostname, DnsRecordType.CNAME, getNameServers(hostname), promise); + query(hostname, DnsRecordType.CNAME, getNameServers(hostname), true, promise); return; } } else { @@ -891,11 +894,12 @@ abstract class DnsResolveContext { PlatformDependent.throwException(cause); return; } - query(stream, 0, cnameQuestion, queryLifecycleObserver.queryCNAMEd(cnameQuestion), promise, null); + query(stream, 0, cnameQuestion, queryLifecycleObserver.queryCNAMEd(cnameQuestion), + true, promise, null); } private boolean query(String hostname, DnsRecordType type, DnsServerAddressStream dnsServerAddressStream, - Promise> promise) { + boolean flush, Promise> promise) { final DnsQuestion question; try { question = new DefaultDnsQuestion(hostname, type, dnsClass); @@ -906,10 +910,14 @@ abstract class DnsResolveContext { type + ']', cause)); return false; } - query(dnsServerAddressStream, 0, question, promise, null); + query(dnsServerAddressStream, 0, question, newDnsQueryLifecycleObserver(question), flush, promise, null); return true; } + private DnsQueryLifecycleObserver newDnsQueryLifecycleObserver(DnsQuestion question) { + return parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question); + } + private final class CombinedDnsServerAddressStream implements DnsServerAddressStream { private final InetSocketAddress replaced; private final DnsServerAddressStream originalStream;