diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsAddressResolverGroup.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsAddressResolverGroup.java index 4fb88c44d8..1f666076c7 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsAddressResolverGroup.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsAddressResolverGroup.java @@ -22,13 +22,20 @@ import io.netty.channel.ReflectiveChannelFactory; import io.netty.channel.socket.DatagramChannel; import io.netty.resolver.AddressResolver; import io.netty.resolver.AddressResolverGroup; +import io.netty.resolver.InetSocketAddressResolver; +import io.netty.resolver.NameResolver; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Promise; import io.netty.util.internal.StringUtil; import io.netty.util.internal.UnstableApi; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.ConcurrentMap; import static io.netty.resolver.dns.DnsNameResolver.ANY_LOCAL_ADDR; +import static io.netty.util.internal.PlatformDependent.newConcurrentHashMap; /** * A {@link AddressResolverGroup} of {@link DnsNameResolver}s. @@ -40,6 +47,9 @@ public class DnsAddressResolverGroup extends AddressResolverGroup> resolvesInProgress = newConcurrentHashMap(); + private final ConcurrentMap>> resolveAllsInProgress = newConcurrentHashMap(); + public DnsAddressResolverGroup( Class channelType, DnsServerAddresses nameServerAddresses) { this(channelType, ANY_LOCAL_ADDR, nameServerAddresses); @@ -83,11 +93,16 @@ public class DnsAddressResolverGroup extends AddressResolverGroup channelFactory, InetSocketAddress localAddress, DnsServerAddresses nameServerAddresses) throws Exception { - return new DnsNameResolverBuilder(eventLoop) - .channelFactory(channelFactory) - .localAddress(localAddress) - .nameServerAddresses(nameServerAddresses) - .build() - .asAddressResolver(); + final NameResolver resolver = new InflightNameResolver( + eventLoop, + new DnsNameResolverBuilder(eventLoop) + .channelFactory(channelFactory) + .localAddress(localAddress) + .nameServerAddresses(nameServerAddresses) + .build(), + resolvesInProgress, + resolveAllsInProgress); + + return new InetSocketAddressResolver(eventLoop, resolver); } } diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/InflightNameResolver.java b/resolver-dns/src/main/java/io/netty/resolver/dns/InflightNameResolver.java new file mode 100644 index 0000000000..cb93f9a212 --- /dev/null +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/InflightNameResolver.java @@ -0,0 +1,131 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.resolver.dns; + +import io.netty.resolver.NameResolver; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; +import io.netty.util.internal.StringUtil; + +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +import static io.netty.util.internal.ObjectUtil.checkNotNull; + +// FIXME(trustin): Find a better name and move it to the 'resolver' module. +final class InflightNameResolver implements NameResolver { + + private final EventExecutor executor; + private final NameResolver delegate; + private final ConcurrentMap> resolvesInProgress; + private final ConcurrentMap>> resolveAllsInProgress; + + InflightNameResolver(EventExecutor executor, NameResolver delegate, + ConcurrentMap> resolvesInProgress, + ConcurrentMap>> resolveAllsInProgress) { + + this.executor = checkNotNull(executor, "executor"); + this.delegate = checkNotNull(delegate, "delegate"); + this.resolvesInProgress = checkNotNull(resolvesInProgress, "resolvesInProgress"); + this.resolveAllsInProgress = checkNotNull(resolveAllsInProgress, "resolveAllsInProgress"); + } + + @Override + public Future resolve(String inetHost) { + return resolve(inetHost, executor.newPromise()); + } + + @Override + public Future> resolveAll(String inetHost) { + return resolveAll(inetHost, executor.>newPromise()); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public Promise resolve(String inetHost, Promise promise) { + return resolve(resolvesInProgress, inetHost, promise, false); + } + + @Override + public Promise> resolveAll(String inetHost, Promise> promise) { + return resolve(resolveAllsInProgress, inetHost, promise, true); + } + + private Promise resolve( + final ConcurrentMap> resolveMap, + final String inetHost, final Promise promise, boolean resolveAll) { + + final Promise earlyPromise = resolveMap.putIfAbsent(inetHost, promise); + if (earlyPromise != null) { + // Name resolution for the specified inetHost is in progress already. + if (earlyPromise.isDone()) { + transferResult(earlyPromise, promise); + } else { + earlyPromise.addListener(new FutureListener() { + @Override + public void operationComplete(Future f) throws Exception { + transferResult(f, promise); + } + }); + } + } else { + try { + if (resolveAll) { + @SuppressWarnings("unchecked") + final Promise> castPromise = (Promise>) promise; // U is List + delegate.resolveAll(inetHost, castPromise); + } else { + @SuppressWarnings("unchecked") + final Promise castPromise = (Promise) promise; // U is T + delegate.resolve(inetHost, castPromise); + } + } finally { + if (promise.isDone()) { + resolveMap.remove(inetHost); + } else { + promise.addListener(new FutureListener() { + @Override + public void operationComplete(Future f) throws Exception { + resolveMap.remove(inetHost); + } + }); + } + } + } + + return promise; + } + + private static void transferResult(Future src, Promise dst) { + if (src.isSuccess()) { + dst.trySuccess(src.getNow()); + } else { + dst.tryFailure(src.cause()); + } + } + + @Override + public String toString() { + return StringUtil.simpleClassName(this) + '(' + delegate + ')'; + } +}