diff options
author | Hadrien Zalek <hzalek@google.com> | 2020-07-24 12:30:10 -0700 |
---|---|---|
committer | Hadrien Zalek <hzalek@google.com> | 2020-07-24 23:35:41 +0000 |
commit | 9b4675b8aba18f88cdb08004aac053ce16c4968a (patch) | |
tree | 900389d86d6c0d165b93722f60de46164ecc6255 /core | |
parent | 332041b0592239d4bfe59bfd316f28c02f523570 (diff) | |
parent | 57043233bf5aecce92f0c6629b6ac46d9393ce8c (diff) | |
download | grpc-grpc-java-9b4675b8aba18f88cdb08004aac053ce16c4968a.tar.gz |
Merge tag 'upstream/v1.16.1' into HEAD
Update the Java gRPC implementation source to that of a released version
(v1.16.1) instead of some intermediate commit after v1.15.0.
Test: m grpc-java
Bug: 148404241
Change-Id: I9c072aee054a4aecc1bdf39adf45e9a243b907f5
Diffstat (limited to 'core')
28 files changed, 1365 insertions, 576 deletions
diff --git a/core/build.gradle b/core/build.gradle index 700592f86..4a2429098 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -3,12 +3,19 @@ description = 'gRPC: Core' dependencies { compile project(':grpc-context'), libraries.gson, - libraries.guava, libraries.errorprone, libraries.jsr305, libraries.animalsniffer_annotations + compile (libraries.guava) { + // prefer 2.2.0 from libraries instead of 2.1.3 + exclude group: 'com.google.errorprone', module: 'error_prone_annotations' + // prefer 3.0.2 from libraries instead of 3.0.1 + exclude group: 'com.google.code.findbugs', module: 'jsr305' + // prefer 1.17 from libraries instead of 1.14 + exclude group: 'org.codehaus.mojo', module: 'animal-sniffer-annotations' + } compile (libraries.opencensus_api) { - // prefer 3.0.0 from libraries instead of 3.0.1 + // prefer 3.0.2 from libraries instead of 3.0.1 exclude group: 'com.google.code.findbugs', module: 'jsr305' // prefer 20.0 from libraries instead of 19.0 exclude group: 'com.google.guava', module: 'guava' @@ -16,7 +23,7 @@ dependencies { exclude group: 'io.grpc', module: 'grpc-context' } compile (libraries.opencensus_contrib_grpc_metrics) { - // prefer 3.0.0 from libraries instead of 3.0.1 + // prefer 3.0.2 from libraries instead of 3.0.1 exclude group: 'com.google.code.findbugs', module: 'jsr305' // we'll always be more up-to-date exclude group: 'io.grpc', module: 'grpc-context' diff --git a/core/src/main/java/io/grpc/Attributes.java b/core/src/main/java/io/grpc/Attributes.java index 9a13f5160..411d11c33 100644 --- a/core/src/main/java/io/grpc/Attributes.java +++ b/core/src/main/java/io/grpc/Attributes.java @@ -29,6 +29,18 @@ import javax.annotation.concurrent.Immutable; /** * An immutable type-safe container of attributes. + * + * <h3>Annotation semantics</h3> + * + * <p>As a convention, annotations such as {@link Grpc.TransportAttr} is defined to associate + * attribute {@link Key}s and their propagation paths. The annotation may be applied to a {@code + * Key} definition field, a method that returns {@link Attributes}, or a variable of type {@link + * Attributes}, to indicate that the annotated {@link Attributes} objects may contain the annotated + * {@code Key}. + * + * <p>Javadoc users may click "USE" on the navigation bars of the annotation's javadoc page to view + * references of such annotation. + * * @since 1.13.0 */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1764") diff --git a/core/src/main/java/io/grpc/BinaryLog.java b/core/src/main/java/io/grpc/BinaryLog.java index 5c2d4ab2f..9d11b63cf 100644 --- a/core/src/main/java/io/grpc/BinaryLog.java +++ b/core/src/main/java/io/grpc/BinaryLog.java @@ -29,21 +29,4 @@ public abstract class BinaryLog implements Closeable { ServerMethodDefinition<ReqT, RespT> oMethodDef); public abstract Channel wrapChannel(Channel channel); - - /** - * A CallId is two byte[] arrays both of size 8 that uniquely identifies the RPC. Users are - * free to use the byte arrays however they see fit. - */ - public static final class CallId { - public final long hi; - public final long lo; - - /** - * Creates an instance. - */ - public CallId(long hi, long lo) { - this.hi = hi; - this.lo = lo; - } - } } diff --git a/core/src/main/java/io/grpc/CallCredentials.java b/core/src/main/java/io/grpc/CallCredentials.java index 7ce621993..03be65310 100644 --- a/core/src/main/java/io/grpc/CallCredentials.java +++ b/core/src/main/java/io/grpc/CallCredentials.java @@ -40,10 +40,15 @@ public interface CallCredentials { * The security level of the transport. It is guaranteed to be present in the {@code attrs} passed * to {@link #applyRequestMetadata}. It is by default {@link SecurityLevel#NONE} but can be * overridden by the transport. + * + * @deprecated transport implementations should use {@code + * io.grpc.internal.GrpcAttributes.ATTR_SECURITY_LEVEL} instead. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914") + @Grpc.TransportAttr + @Deprecated public static final Key<SecurityLevel> ATTR_SECURITY_LEVEL = - Key.create("io.grpc.CallCredentials.securityLevel"); + Key.create("io.grpc.internal.GrpcAttributes.securityLevel"); /** * The authority string used to authenticate the server. Usually it's the server's host name. It @@ -52,6 +57,8 @@ public interface CallCredentials { * io.grpc.CallOptions} with increasing precedence. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914") + @Grpc.TransportAttr + @Deprecated public static final Key<String> ATTR_AUTHORITY = Key.create("io.grpc.CallCredentials.authority"); /** @@ -71,8 +78,11 @@ public interface CallCredentials { * needs to perform blocking operations. * @param applier The outlet of the produced headers. It can be called either before or after this * method returns. + * + * @deprecated implement {@link CallCredentials2} instead. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914") + @Deprecated void applyRequestMetadata( MethodDescriptor<?, ?> method, Attributes attrs, Executor appExecutor, MetadataApplier applier); @@ -89,6 +99,7 @@ public interface CallCredentials { * * <p>Exactly one of its methods must be called to make the RPC proceed. */ + @Deprecated @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914") public interface MetadataApplier { /** @@ -102,4 +113,31 @@ public interface CallCredentials { */ void fail(Status status); } + + /** + * The request-related information passed to {@code CallCredentials2.applyRequestMetadata()}. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914") + public abstract static class RequestInfo { + /** + * The method descriptor of this RPC. + */ + public abstract MethodDescriptor<?, ?> getMethodDescriptor(); + + /** + * The security level on the transport. + */ + public abstract SecurityLevel getSecurityLevel(); + + /** + * Returns the authority string used to authenticate the server for this call. + */ + public abstract String getAuthority(); + + /** + * Returns the transport attributes. + */ + @Grpc.TransportAttr + public abstract Attributes getTransportAttrs(); + } } diff --git a/core/src/main/java/io/grpc/CallCredentials2.java b/core/src/main/java/io/grpc/CallCredentials2.java new file mode 100644 index 000000000..998df42db --- /dev/null +++ b/core/src/main/java/io/grpc/CallCredentials2.java @@ -0,0 +1,99 @@ +/* + * Copyright 2016 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.concurrent.Executor; + +/** + * The new interface for {@link CallCredentials}. + * + * <p>THIS CLASS NAME IS TEMPORARY and is part of a migration. This class will BE DELETED as it + * replaces {@link CallCredentials} in short-term. THIS CLASS SHOULD ONLY BE REFERENCED BY + * IMPLEMENTIONS. All consumers should still reference {@link CallCredentials}. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4901") +public abstract class CallCredentials2 implements CallCredentials { + /** + * Pass the credential data to the given {@link CallCredentials.MetadataApplier}, which will + * propagate it to the request metadata. + * + * <p>It is called for each individual RPC, within the {@link Context} of the call, before the + * stream is about to be created on a transport. Implementations should not block in this + * method. If metadata is not immediately available, e.g., needs to be fetched from network, the + * implementation may give the {@code applier} to an asynchronous task which will eventually call + * the {@code applier}. The RPC proceeds only after the {@code applier} is called. + * + * @param requestInfo request-related information + * @param appExecutor The application thread-pool. It is provided to the implementation in case it + * needs to perform blocking operations. + * @param applier The outlet of the produced headers. It can be called either before or after this + * method returns. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914") + public abstract void applyRequestMetadata( + RequestInfo requestInfo, Executor appExecutor, MetadataApplier applier); + + @Override + @SuppressWarnings("deprecation") + public final void applyRequestMetadata( + final MethodDescriptor<?, ?> method, final Attributes attrs, + Executor appExecutor, final CallCredentials.MetadataApplier applier) { + final String authority = checkNotNull(attrs.get(ATTR_AUTHORITY), "authority"); + final SecurityLevel securityLevel = + firstNonNull(attrs.get(ATTR_SECURITY_LEVEL), SecurityLevel.NONE); + RequestInfo requestInfo = new RequestInfo() { + @Override + public MethodDescriptor<?, ?> getMethodDescriptor() { + return method; + } + + @Override + public SecurityLevel getSecurityLevel() { + return securityLevel; + } + + @Override + public String getAuthority() { + return authority; + } + + @Override + public Attributes getTransportAttrs() { + return attrs; + } + }; + MetadataApplier applierAdapter = new MetadataApplier() { + @Override + public void apply(Metadata headers) { + applier.apply(headers); + } + + @Override + public void fail(Status status) { + applier.fail(status); + } + }; + applyRequestMetadata(requestInfo, appExecutor, applierAdapter); + } + + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914") + @SuppressWarnings("deprecation") + public abstract static class MetadataApplier implements CallCredentials.MetadataApplier {} +} diff --git a/core/src/main/java/io/grpc/ClientCall.java b/core/src/main/java/io/grpc/ClientCall.java index 936cb322b..2aa5034b7 100644 --- a/core/src/main/java/io/grpc/ClientCall.java +++ b/core/src/main/java/io/grpc/ClientCall.java @@ -262,12 +262,11 @@ public abstract class ClientCall<ReqT, RespT> { * or {@link Listener#onClose}. If called prematurely, the implementation may throw {@code * IllegalStateException} or return arbitrary {@code Attributes}. * - * <p>{@link Grpc} defines commonly used attributes, but they are not guaranteed to be present. - * * @return non-{@code null} attributes * @throws IllegalStateException (optional) if called before permitted */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/2607") + @Grpc.TransportAttr public Attributes getAttributes() { return Attributes.EMPTY; } diff --git a/core/src/main/java/io/grpc/EquivalentAddressGroup.java b/core/src/main/java/io/grpc/EquivalentAddressGroup.java index 2218a7fae..eabd2eafb 100644 --- a/core/src/main/java/io/grpc/EquivalentAddressGroup.java +++ b/core/src/main/java/io/grpc/EquivalentAddressGroup.java @@ -17,6 +17,9 @@ package io.grpc; import com.google.common.base.Preconditions; +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; @@ -50,7 +53,7 @@ public final class EquivalentAddressGroup { /** * List constructor with {@link Attributes}. */ - public EquivalentAddressGroup(List<SocketAddress> addrs, Attributes attrs) { + public EquivalentAddressGroup(List<SocketAddress> addrs, @Attr Attributes attrs) { Preconditions.checkArgument(!addrs.isEmpty(), "addrs is empty"); this.addrs = Collections.unmodifiableList(new ArrayList<>(addrs)); this.attrs = Preconditions.checkNotNull(attrs, "attrs"); @@ -69,7 +72,7 @@ public final class EquivalentAddressGroup { /** * Singleton constructor with Attributes. */ - public EquivalentAddressGroup(SocketAddress addr, Attributes attrs) { + public EquivalentAddressGroup(SocketAddress addr, @Attr Attributes attrs) { this(Collections.singletonList(addr), attrs); } @@ -83,6 +86,7 @@ public final class EquivalentAddressGroup { /** * Returns the attributes. */ + @Attr public Attributes getAttributes() { return attrs; } @@ -127,4 +131,13 @@ public final class EquivalentAddressGroup { } return true; } + + /** + * Annotation for {@link EquivalentAddressGroup}'s attributes. It follows the annotation semantics + * defined by {@link Attributes}. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4972") + @Retention(RetentionPolicy.SOURCE) + @Documented + public @interface Attr {} } diff --git a/core/src/main/java/io/grpc/Grpc.java b/core/src/main/java/io/grpc/Grpc.java index aef072606..53ff28be3 100644 --- a/core/src/main/java/io/grpc/Grpc.java +++ b/core/src/main/java/io/grpc/Grpc.java @@ -16,6 +16,9 @@ package io.grpc; +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; import java.net.SocketAddress; import javax.net.ssl.SSLSession; @@ -31,13 +34,32 @@ public final class Grpc { * Attribute key for the remote address of a transport. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710") + @TransportAttr public static final Attributes.Key<SocketAddress> TRANSPORT_ATTR_REMOTE_ADDR = - Attributes.Key.create("remote-addr"); + Attributes.Key.create("remote-addr"); + + /** + * Attribute key for the local address of a transport. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710") + @TransportAttr + public static final Attributes.Key<SocketAddress> TRANSPORT_ATTR_LOCAL_ADDR = + Attributes.Key.create("local-addr"); /** * Attribute key for SSL session of a transport. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710") + @TransportAttr public static final Attributes.Key<SSLSession> TRANSPORT_ATTR_SSL_SESSION = - Attributes.Key.create("ssl-session"); + Attributes.Key.create("ssl-session"); + + /** + * Annotation for transport attributes. It follows the annotation semantics defined + * by {@link Attributes}. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4972") + @Retention(RetentionPolicy.SOURCE) + @Documented + public @interface TransportAttr {} } diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java index 633bf5263..f1cef63fb 100644 --- a/core/src/main/java/io/grpc/LoadBalancer.java +++ b/core/src/main/java/io/grpc/LoadBalancer.java @@ -107,11 +107,12 @@ public abstract class LoadBalancer { * <p>Implementations should not modify the given {@code servers}. * * @param servers the resolved server addresses, never empty. - * @param attributes extra metadata from naming system. + * @param attributes extra information from naming system. * @since 1.2.0 */ public abstract void handleResolvedAddressGroups( - List<EquivalentAddressGroup> servers, Attributes attributes); + List<EquivalentAddressGroup> servers, + @NameResolver.ResolutionResultAttr Attributes attributes); /** * Handles an error from the name resolution system. @@ -154,6 +155,11 @@ public abstract class LoadBalancer { */ public abstract void shutdown(); + @Override + public String toString() { + return getClass().getSimpleName(); + } + /** * The main balancing logic. It <strong>must be thread-safe</strong>. Typically it should only * synchronize on its own state, and avoid synchronizing with the LoadBalancer's state. diff --git a/core/src/main/java/io/grpc/NameResolver.java b/core/src/main/java/io/grpc/NameResolver.java index 60f819e4d..845dcb02f 100644 --- a/core/src/main/java/io/grpc/NameResolver.java +++ b/core/src/main/java/io/grpc/NameResolver.java @@ -16,6 +16,9 @@ package io.grpc; +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; import java.net.URI; import java.util.List; import javax.annotation.Nullable; @@ -126,6 +129,7 @@ public abstract class NameResolver { * * @since 1.0.0 */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770") @ThreadSafe public interface Listener { /** @@ -134,10 +138,11 @@ public abstract class NameResolver { * <p>Implementations will not modify the given {@code servers}. * * @param servers the resolved server addresses. An empty list will trigger {@link #onError} - * @param attributes extra metadata from naming system + * @param attributes extra information from naming system. * @since 1.3.0 */ - void onAddresses(List<EquivalentAddressGroup> servers, Attributes attributes); + void onAddresses( + List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes); /** * Handles an error from the resolver. The listener is responsible for eventually invoking @@ -148,4 +153,13 @@ public abstract class NameResolver { */ void onError(Status error); } + + /** + * Annotation for name resolution result attributes. It follows the annotation semantics defined + * by {@link Attributes}. + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4972") + @Retention(RetentionPolicy.SOURCE) + @Documented + public @interface ResolutionResultAttr {} } diff --git a/core/src/main/java/io/grpc/ServerCall.java b/core/src/main/java/io/grpc/ServerCall.java index 106676fa7..07711e381 100644 --- a/core/src/main/java/io/grpc/ServerCall.java +++ b/core/src/main/java/io/grpc/ServerCall.java @@ -192,11 +192,11 @@ public abstract class ServerCall<ReqT, RespT> { * Returns properties of a single call. * * <p>Attributes originate from the transport and can be altered by {@link ServerTransportFilter}. - * {@link Grpc} defines commonly used attributes, but they are not guaranteed to be present. * * @return non-{@code null} Attributes container */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1779") + @Grpc.TransportAttr public Attributes getAttributes() { return Attributes.EMPTY; } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 7cf1a89ba..03e2e1a19 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -24,7 +24,6 @@ import com.google.common.base.MoreObjects; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; -import io.grpc.CallCredentials; import io.grpc.CallOptions; import io.grpc.Compressor; import io.grpc.Deadline; @@ -41,6 +40,7 @@ import io.grpc.Status; import io.grpc.internal.ClientStream; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ConnectionClientTransport; +import io.grpc.internal.GrpcAttributes; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.NoopClientStream; @@ -91,7 +91,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans @GuardedBy("this") private List<ServerStreamTracer.Factory> serverStreamTracerFactories; private final Attributes attributes = Attributes.newBuilder() - .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) + .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) .build(); public InProcessTransport(String name, String authority, String userAgent) { @@ -132,6 +132,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans synchronized (InProcessTransport.this) { Attributes serverTransportAttrs = Attributes.newBuilder() .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name)) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name)) .build(); serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs); clientTransportListener.transportReady(); diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java index 80505bce3..a8f778a46 100644 --- a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java @@ -16,11 +16,14 @@ package io.grpc.internal; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; +import io.grpc.InternalChannelz.ChannelTrace; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; @@ -32,6 +35,7 @@ import java.lang.reflect.Method; import java.util.List; import java.util.Locale; import java.util.Map; +import javax.annotation.CheckForNull; import javax.annotation.Nullable; final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory { @@ -43,11 +47,20 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory { static final String GRPCLB_LOAD_BALANCER_FACTORY_NAME = "io.grpc.grpclb.GrpclbLoadBalancerFactory"; - AutoConfiguredLoadBalancerFactory() {} + @Nullable + private final ChannelTracer channelTracer; + @Nullable + private final TimeProvider timeProvider; + + AutoConfiguredLoadBalancerFactory( + @Nullable ChannelTracer channelTracer, @Nullable TimeProvider timeProvider) { + this.channelTracer = channelTracer; + this.timeProvider = timeProvider; + } @Override public LoadBalancer newLoadBalancer(Helper helper) { - return new AutoConfiguredLoadBalancer(helper); + return new AutoConfiguredLoadBalancer(helper, channelTracer, timeProvider); } private static final class NoopLoadBalancer extends LoadBalancer { @@ -71,11 +84,21 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory { private final Helper helper; private LoadBalancer delegate; private LoadBalancer.Factory delegateFactory; + @CheckForNull + private ChannelTracer channelTracer; + @Nullable + private final TimeProvider timeProvider; - AutoConfiguredLoadBalancer(Helper helper) { + AutoConfiguredLoadBalancer( + Helper helper, @Nullable ChannelTracer channelTracer, @Nullable TimeProvider timeProvider) { this.helper = helper; delegateFactory = PickFirstBalancerFactory.getInstance(); delegate = delegateFactory.newLoadBalancer(helper); + this.channelTracer = channelTracer; + this.timeProvider = timeProvider; + if (channelTracer != null) { + checkNotNull(timeProvider, "timeProvider"); + } } // Must be run inside ChannelExecutor. @@ -101,7 +124,15 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory { helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker()); delegate.shutdown(); delegateFactory = newlbf; + LoadBalancer old = delegate; delegate = delegateFactory.newLoadBalancer(helper); + if (channelTracer != null) { + channelTracer.reportEvent(new ChannelTrace.Event.Builder() + .setDescription("Load balancer changed from " + old + " to " + delegate) + .setSeverity(ChannelTrace.Event.Severity.CT_INFO) + .setTimestampNanos(timeProvider.currentTimeNanos()) + .build()); + } } getDelegate().handleResolvedAddressGroups(servers, attributes); } diff --git a/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java b/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java index df2e0a0bd..e5141cf1e 100644 --- a/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java +++ b/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java @@ -72,6 +72,7 @@ final class CallCredentialsApplyingTransportFactory implements ClientTransportFa } @Override + @SuppressWarnings("deprecation") public ClientStream newStream( MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) { CallCredentials creds = callOptions.getCredentials(); diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java index 490bd8255..dc1b783fd 100644 --- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java +++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java @@ -20,7 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; import com.google.common.base.Verify; import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; @@ -43,7 +43,6 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -85,27 +84,18 @@ final class DnsNameResolver extends NameResolver { private static final String JNDI_PROPERTY = System.getProperty("io.grpc.internal.DnsNameResolverProvider.enable_jndi", "true"); + private static final String JNDI_LOCALHOST_PROPERTY = + System.getProperty("io.grpc.internal.DnsNameResolverProvider.enable_jndi_localhost", "false"); private static final String JNDI_SRV_PROPERTY = System.getProperty("io.grpc.internal.DnsNameResolverProvider.enable_grpclb", "false"); private static final String JNDI_TXT_PROPERTY = System.getProperty("io.grpc.internal.DnsNameResolverProvider.enable_service_config", "false"); - /** - * Java networking system properties name for caching DNS result. - * - * <p>Default value is -1 (cache forever) if security manager is installed. If security manager is - * not installed, the ttl value is {@code null} which falls back to {@link - * #DEFAULT_NETWORK_CACHE_TTL_SECONDS gRPC default value}. - */ - @VisibleForTesting - static final String NETWORKADDRESS_CACHE_TTL_PROPERTY = "networkaddress.cache.ttl"; - /** Default DNS cache duration if network cache ttl value is not specified ({@code null}). */ - @VisibleForTesting - static final long DEFAULT_NETWORK_CACHE_TTL_SECONDS = 30; - @VisibleForTesting static boolean enableJndi = Boolean.parseBoolean(JNDI_PROPERTY); @VisibleForTesting + static boolean enableJndiLocalhost = Boolean.parseBoolean(JNDI_LOCALHOST_PROPERTY); + @VisibleForTesting static boolean enableSrv = Boolean.parseBoolean(JNDI_SRV_PROPERTY); @VisibleForTesting static boolean enableTxt = Boolean.parseBoolean(JNDI_TXT_PROPERTY); @@ -130,8 +120,6 @@ final class DnsNameResolver extends NameResolver { private final String host; private final int port; private final Resource<ExecutorService> executorResource; - private final long networkAddressCacheTtlNanos; - private final Stopwatch stopwatch; @GuardedBy("this") private boolean shutdown; @GuardedBy("this") @@ -140,11 +128,10 @@ final class DnsNameResolver extends NameResolver { private boolean resolving; @GuardedBy("this") private Listener listener; - private ResolutionResults cachedResolutionResults; DnsNameResolver(@Nullable String nsAuthority, String name, Attributes params, - Resource<ExecutorService> executorResource, ProxyDetector proxyDetector, - Stopwatch stopwatch) { + Resource<ExecutorService> executorResource, + ProxyDetector proxyDetector) { // TODO: if a DNS server is provided as nsAuthority, use it. // https://www.captechconsulting.com/blogs/accessing-the-dusty-corners-of-dns-with-java this.executorResource = executorResource; @@ -167,8 +154,6 @@ final class DnsNameResolver extends NameResolver { port = nameUri.getPort(); } this.proxyDetector = proxyDetector; - this.stopwatch = Preconditions.checkNotNull(stopwatch, "stopwatch"); - this.networkAddressCacheTtlNanos = getNetworkAddressCacheTtlNanos(); } @Override @@ -198,13 +183,6 @@ final class DnsNameResolver extends NameResolver { if (shutdown) { return; } - boolean resourceRefreshRequired = cachedResolutionResults == null - || networkAddressCacheTtlNanos == 0 - || (networkAddressCacheTtlNanos > 0 - && stopwatch.elapsed(TimeUnit.NANOSECONDS) > networkAddressCacheTtlNanos); - if (!resourceRefreshRequired) { - return; - } savedListener = listener; resolving = true; } @@ -229,15 +207,11 @@ final class DnsNameResolver extends NameResolver { ResolutionResults resolutionResults; try { ResourceResolver resourceResolver = null; - if (enableJndi) { + if (shouldUseJndi(enableJndi, enableJndiLocalhost, host)) { resourceResolver = getResourceResolver(); } resolutionResults = resolveAll(addressResolver, resourceResolver, enableSrv, enableTxt, host); - cachedResolutionResults = resolutionResults; - if (networkAddressCacheTtlNanos > 0) { - stopwatch.reset().start(); - } } catch (Exception e) { savedListener.onError( Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e)); @@ -284,23 +258,6 @@ final class DnsNameResolver extends NameResolver { } }; - /** Returns value of network address cache ttl property. */ - private static long getNetworkAddressCacheTtlNanos() { - String cacheTtlPropertyValue = System.getProperty(NETWORKADDRESS_CACHE_TTL_PROPERTY); - long cacheTtl = DEFAULT_NETWORK_CACHE_TTL_SECONDS; - if (cacheTtlPropertyValue != null) { - try { - cacheTtl = Long.parseLong(cacheTtlPropertyValue); - } catch (NumberFormatException e) { - logger.log( - Level.WARNING, - "Property({0}) valid is not valid number format({1}), fall back to default({2})", - new Object[] {NETWORKADDRESS_CACHE_TTL_PROPERTY, cacheTtlPropertyValue, cacheTtl}); - } - } - return cacheTtl > 0 ? TimeUnit.SECONDS.toNanos(cacheTtl) : cacheTtl; - } - @GuardedBy("this") private void resolve() { if (resolving || shutdown) { @@ -368,7 +325,9 @@ final class DnsNameResolver extends NameResolver { } } try { - if (addressesException != null && balancerAddressesException != null) { + if (addressesException != null + && (balancerAddressesException != null || balancerAddresses.isEmpty())) { + Throwables.throwIfUnchecked(addressesException); throw new RuntimeException(addressesException); } } finally { @@ -626,4 +585,28 @@ final class DnsNameResolver extends NameResolver { } return localHostname; } + + @VisibleForTesting + static boolean shouldUseJndi(boolean jndiEnabled, boolean jndiLocalhostEnabled, String target) { + if (!jndiEnabled) { + return false; + } + if ("localhost".equalsIgnoreCase(target)) { + return jndiLocalhostEnabled; + } + // Check if this name looks like IPv6 + if (target.contains(":")) { + return false; + } + // Check if this might be IPv4. Such addresses have no alphabetic characters. This also + // checks the target is empty. + boolean alldigits = true; + for (int i = 0; i < target.length(); i++) { + char c = target.charAt(i); + if (c != '.') { + alldigits &= (c >= '0' && c <= '9'); + } + } + return !alldigits; + } } diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java b/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java index d0db539d4..cddbe3f3b 100644 --- a/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java +++ b/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java @@ -17,7 +17,6 @@ package io.grpc.internal; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import io.grpc.Attributes; import io.grpc.NameResolverProvider; import java.net.URI; @@ -53,8 +52,7 @@ public final class DnsNameResolverProvider extends NameResolverProvider { name, params, GrpcUtil.SHARED_CHANNEL_EXECUTOR, - GrpcUtil.getDefaultProxyDetector(), - Stopwatch.createUnstarted()); + GrpcUtil.getDefaultProxyDetector()); } else { return null; } diff --git a/core/src/main/java/io/grpc/internal/GrpcAttributes.java b/core/src/main/java/io/grpc/internal/GrpcAttributes.java index 67da06fa7..6a63864f6 100644 --- a/core/src/main/java/io/grpc/internal/GrpcAttributes.java +++ b/core/src/main/java/io/grpc/internal/GrpcAttributes.java @@ -17,6 +17,10 @@ package io.grpc.internal; import io.grpc.Attributes; +import io.grpc.EquivalentAddressGroup; +import io.grpc.Grpc; +import io.grpc.NameResolver; +import io.grpc.SecurityLevel; import java.util.Map; /** @@ -26,6 +30,7 @@ public final class GrpcAttributes { /** * Attribute key for service config. */ + @NameResolver.ResolutionResultAttr public static final Attributes.Key<Map<String, Object>> NAME_RESOLVER_SERVICE_CONFIG = Attributes.Key.create("service-config"); @@ -33,6 +38,7 @@ public final class GrpcAttributes { * The naming authority of a gRPC LB server address. It is an address-group-level attribute, * present when the address group is a LoadBalancer. */ + @EquivalentAddressGroup.Attr public static final Attributes.Key<String> ATTR_LB_ADDR_AUTHORITY = Attributes.Key.create("io.grpc.grpclb.lbAddrAuthority"); @@ -40,8 +46,18 @@ public final class GrpcAttributes { * Whether this EquivalentAddressGroup was provided by a GRPCLB server. It would be rare for this * value to be {@code false}; generally it would be better to not have the key present at all. */ + @EquivalentAddressGroup.Attr public static final Attributes.Key<Boolean> ATTR_LB_PROVIDED_BACKEND = Attributes.Key.create("io.grpc.grpclb.lbProvidedBackend"); + /** + * The security level of the transport. If it's not present, {@link SecurityLevel#NONE} should be + * assumed. + */ + @SuppressWarnings("deprecation") + @Grpc.TransportAttr + public static final Attributes.Key<SecurityLevel> ATTR_SECURITY_LEVEL = + io.grpc.CallCredentials.ATTR_SECURITY_LEVEL; + private GrpcAttributes() {} } diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 539829941..80fd2f52c 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -200,7 +200,7 @@ public final class GrpcUtil { public static final Splitter ACCEPT_ENCODING_SPLITTER = Splitter.on(',').trimResults(); - private static final String IMPLEMENTATION_VERSION = "1.16.0-SNAPSHOT"; // CURRENT_GRPC_VERSION + private static final String IMPLEMENTATION_VERSION = "1.16.1"; // CURRENT_GRPC_VERSION /** * The default delay in nanos before we send a keepalive. diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index dc203b016..e1f840ee3 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -125,13 +125,7 @@ final class ManagedChannelImpl extends ManagedChannel implements private final TimeProvider timeProvider; private final int maxTraceEvents; - private final ChannelExecutor channelExecutor = new ChannelExecutor() { - @Override - void handleUncaughtThrowable(Throwable t) { - super.handleUncaughtThrowable(t); - panic(t); - } - }; + private final ChannelExecutor channelExecutor = new PanicChannelExecutor(); private boolean fullStreamDecompression; @@ -236,34 +230,7 @@ final class ManagedChannelImpl extends ManagedChannel implements // Called from channelExecutor private final ManagedClientTransport.Listener delayedTransportListener = - new ManagedClientTransport.Listener() { - @Override - public void transportShutdown(Status s) { - checkState(shutdown.get(), "Channel must have been shut down"); - } - - @Override - public void transportReady() { - // Don't care - } - - @Override - public void transportInUse(final boolean inUse) { - inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); - } - - @Override - public void transportTerminated() { - checkState(shutdown.get(), "Channel must have been shut down"); - terminating = true; - shutdownNameResolverAndLoadBalancer(false); - // No need to call channelStateManager since we are already in SHUTDOWN state. - // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them - // here. - maybeShutdownNowSubchannels(); - maybeTerminateChannel(); - } - }; + new DelayedTransportListener(); // Must be called from channelExecutor private void maybeShutdownNowSubchannels() { @@ -279,27 +246,12 @@ final class ManagedChannelImpl extends ManagedChannel implements // Must be accessed from channelExecutor @VisibleForTesting - final InUseStateAggregator<Object> inUseStateAggregator = - new InUseStateAggregator<Object>() { - @Override - void handleInUse() { - exitIdleMode(); - } - - @Override - void handleNotInUse() { - if (shutdown.get()) { - return; - } - rescheduleIdleTimer(); - } - }; + final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator(); @Override public ListenableFuture<ChannelStats> getStats() { final SettableFuture<ChannelStats> ret = SettableFuture.create(); - // subchannels and oobchannels can only be accessed from channelExecutor - channelExecutor.executeLater(new Runnable() { + final class StatsFetcher implements Runnable { @Override public void run() { ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder(); @@ -314,7 +266,10 @@ final class ManagedChannelImpl extends ManagedChannel implements builder.setSubchannels(children); ret.set(builder.build()); } - }).drain(); + } + + // subchannels and oobchannels can only be accessed from channelExecutor + channelExecutor.executeLater(new StatsFetcher()).drain(); return ret; } @@ -463,7 +418,7 @@ final class ManagedChannelImpl extends ManagedChannel implements } } - private final ClientTransportProvider transportProvider = new ClientTransportProvider() { + private final class ChannelTransportProvider implements ClientTransportProvider { @Override public ClientTransport get(PickSubchannelArgs args) { SubchannelPicker pickerCopy = subchannelPicker; @@ -473,12 +428,14 @@ final class ManagedChannelImpl extends ManagedChannel implements return delayedTransport; } if (pickerCopy == null) { - channelExecutor.executeLater(new Runnable() { - @Override - public void run() { - exitIdleMode(); - } - }).drain(); + final class ExitIdleModeForTransport implements Runnable { + @Override + public void run() { + exitIdleMode(); + } + } + + channelExecutor.executeLater(new ExitIdleModeForTransport()).drain(); return delayedTransport; } // There is no need to reschedule the idle timer here. @@ -507,11 +464,21 @@ final class ManagedChannelImpl extends ManagedChannel implements final Metadata headers, final Context context) { checkState(retryEnabled, "retry should be enabled"); - return new RetriableStream<ReqT>( - method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, - getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(), - callOptions.getOption(RETRY_POLICY_KEY), callOptions.getOption(HEDGING_POLICY_KEY), - throttle) { + final class RetryStream extends RetriableStream<ReqT> { + RetryStream() { + super( + method, + headers, + channelBufferUsed, + perRpcBufferLimit, + channelBufferLimit, + getCallExecutor(callOptions), + transportFactory.getScheduledExecutorService(), + callOptions.getOption(RETRY_POLICY_KEY), + callOptions.getOption(HEDGING_POLICY_KEY), + throttle); + } + @Override Status prestart() { return uncommittedRetriableStreamsRegistry.add(this); @@ -534,9 +501,13 @@ final class ManagedChannelImpl extends ManagedChannel implements context.detach(origContext); } } - }; + } + + return new RetryStream(); } - }; + } + + private final ClientTransportProvider transportProvider = new ChannelTransportProvider(); private final Rescheduler idleTimer; @@ -552,8 +523,16 @@ final class ManagedChannelImpl extends ManagedChannel implements this.nameResolverFactory = builder.getNameResolverFactory(); this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams"); this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); + this.timeProvider = checkNotNull(timeProvider, "timeProvider"); + maxTraceEvents = builder.maxTraceEvents; + if (maxTraceEvents > 0) { + long currentTimeNanos = timeProvider.currentTimeNanos(); + channelTracer = new ChannelTracer(builder.maxTraceEvents, currentTimeNanos, "Channel"); + } else { + channelTracer = null; + } if (builder.loadBalancerFactory == null) { - this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(); + this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(channelTracer, timeProvider); } else { this.loadBalancerFactory = builder.loadBalancerFactory; } @@ -568,7 +547,7 @@ final class ManagedChannelImpl extends ManagedChannel implements this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry; serviceConfigInterceptor = new ServiceConfigInterceptor( retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts); - Channel channel = new RealChannel(); + Channel channel = new RealChannel(nameResolver.getServiceAuthority()); channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor); if (builder.binlog != null) { channel = builder.binlog.wrapChannel(channel); @@ -606,24 +585,18 @@ final class ManagedChannelImpl extends ManagedChannel implements this.channelBufferLimit = builder.retryBufferSize; this.perRpcBufferLimit = builder.perRpcBufferLimit; - this.timeProvider = checkNotNull(timeProvider, "timeProvider"); - this.callTracerFactory = new CallTracer.Factory() { + final class ChannelCallTracerFactory implements CallTracer.Factory { @Override public CallTracer create() { return new CallTracer(timeProvider); } - }; + } + + this.callTracerFactory = new ChannelCallTracerFactory(); channelCallTracer = callTracerFactory.create(); this.channelz = checkNotNull(builder.channelz); channelz.addRootChannel(this); - maxTraceEvents = builder.maxTraceEvents; - if (maxTraceEvents > 0) { - long currentTimeNanos = timeProvider.currentTimeNanos(); - channelTracer = new ChannelTracer(builder.maxTraceEvents, currentTimeNanos, "Channel"); - } else { - channelTracer = null; - } logger.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target}); } @@ -687,7 +660,7 @@ final class ManagedChannelImpl extends ManagedChannel implements // delayedTransport.shutdown() may also add some tasks into the queue. But some things inside // delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the // channelExecutor's queue and should not be blocked, so we do not drain() immediately here. - channelExecutor.executeLater(new Runnable() { + final class Shutdown implements Runnable { @Override public void run() { if (channelTracer != null) { @@ -699,15 +672,19 @@ final class ManagedChannelImpl extends ManagedChannel implements } channelStateManager.gotoState(SHUTDOWN); } - }); + } + + channelExecutor.executeLater(new Shutdown()); uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS); - channelExecutor.executeLater(new Runnable() { - @Override - public void run() { - cancelIdleTimer(/* permanent= */ true); - } - }).drain(); + final class CancelIdleTimer implements Runnable { + @Override + public void run() { + cancelIdleTimer(/* permanent= */ true); + } + } + + channelExecutor.executeLater(new CancelIdleTimer()).drain(); logger.log(Level.FINE, "[{0}] Shutting down", getLogId()); return this; } @@ -722,16 +699,18 @@ final class ManagedChannelImpl extends ManagedChannel implements logger.log(Level.FINE, "[{0}] shutdownNow() called", getLogId()); shutdown(); uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS); - channelExecutor.executeLater(new Runnable() { - @Override - public void run() { - if (shutdownNowed) { - return; - } - shutdownNowed = true; - maybeShutdownNowSubchannels(); + final class ShutdownNow implements Runnable { + @Override + public void run() { + if (shutdownNowed) { + return; } - }).drain(); + shutdownNowed = true; + maybeShutdownNowSubchannels(); + } + } + + channelExecutor.executeLater(new ShutdownNow()).drain(); return this; } @@ -745,16 +724,18 @@ final class ManagedChannelImpl extends ManagedChannel implements panicMode = true; cancelIdleTimer(/* permanent= */ true); shutdownNameResolverAndLoadBalancer(false); - SubchannelPicker newPicker = new SubchannelPicker() { - final PickResult panicPickResult = + final class PanicSubchannelPicker extends SubchannelPicker { + private final PickResult panicPickResult = PickResult.withDrop( Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t)); + @Override public PickResult pickSubchannel(PickSubchannelArgs args) { return panicPickResult; } - }; - updateSubchannelPicker(newPicker); + } + + updateSubchannelPicker(new PanicSubchannelPicker()); if (channelTracer != null) { channelTracer.reportEvent( new ChannelTrace.Event.Builder() @@ -810,6 +791,14 @@ final class ManagedChannelImpl extends ManagedChannel implements } private class RealChannel extends Channel { + // Set when the NameResolver is initially created. When we create a new NameResolver for the + // same target, the new instance must have the same value. + private final String authority; + + private RealChannel(String authority) { + this.authority = checkNotNull(authority, "authority"); + } + @Override public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) { @@ -828,8 +817,7 @@ final class ManagedChannelImpl extends ManagedChannel implements @Override public String authority() { - String authority = nameResolver.getServiceAuthority(); - return checkNotNull(authority, "authority"); + return authority; } } @@ -856,60 +844,61 @@ final class ManagedChannelImpl extends ManagedChannel implements public ConnectivityState getState(boolean requestConnection) { ConnectivityState savedChannelState = channelStateManager.getState(); if (requestConnection && savedChannelState == IDLE) { - channelExecutor.executeLater( - new Runnable() { - @Override - public void run() { - exitIdleMode(); - if (subchannelPicker != null) { - subchannelPicker.requestConnection(); - } - } - }).drain(); + final class RequestConnection implements Runnable { + @Override + public void run() { + exitIdleMode(); + if (subchannelPicker != null) { + subchannelPicker.requestConnection(); + } + } + } + + channelExecutor.executeLater(new RequestConnection()).drain(); } return savedChannelState; } @Override public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) { - channelExecutor.executeLater( - new Runnable() { - @Override - public void run() { - channelStateManager.notifyWhenStateChanged(callback, executor, source); - } - }).drain(); + final class NotifyStateChanged implements Runnable { + @Override + public void run() { + channelStateManager.notifyWhenStateChanged(callback, executor, source); + } + } + + channelExecutor.executeLater(new NotifyStateChanged()).drain(); } @Override public void resetConnectBackoff() { - channelExecutor - .executeLater( - new Runnable() { - @Override - public void run() { - if (shutdown.get()) { - return; - } - if (nameResolverRefreshFuture != null) { - checkState(nameResolverStarted, "name resolver must be started"); - cancelNameResolverBackoff(); - nameResolver.refresh(); - } - for (InternalSubchannel subchannel : subchannels) { - subchannel.resetConnectBackoff(); - } - for (OobChannel oobChannel : oobChannels) { - oobChannel.resetConnectBackoff(); - } - } - }) - .drain(); + final class ResetConnectBackoff implements Runnable { + @Override + public void run() { + if (shutdown.get()) { + return; + } + if (nameResolverRefreshFuture != null) { + checkState(nameResolverStarted, "name resolver must be started"); + cancelNameResolverBackoff(); + nameResolver.refresh(); + } + for (InternalSubchannel subchannel : subchannels) { + subchannel.resetConnectBackoff(); + } + for (OobChannel oobChannel : oobChannels) { + oobChannel.resetConnectBackoff(); + } + } + } + + channelExecutor.executeLater(new ResetConnectBackoff()).drain(); } @Override public void enterIdle() { - class PrepareToLoseNetworkRunnable implements Runnable { + final class PrepareToLoseNetworkRunnable implements Runnable { @Override public void run() { if (shutdown.get() || lbHelper == null) { @@ -933,7 +922,7 @@ final class ManagedChannelImpl extends ManagedChannel implements final Object lock = new Object(); @GuardedBy("lock") - Collection<ClientStream> uncommittedRetriableStreams = new HashSet<ClientStream>(); + Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>(); @GuardedBy("lock") Status shutdownStatus; @@ -996,7 +985,7 @@ final class ManagedChannelImpl extends ManagedChannel implements shutdownStatusCopy = shutdownStatus; // Because retriable transport is long-lived, we take this opportunity to down-size the // hashmap. - uncommittedRetriableStreams = new HashSet<ClientStream>(); + uncommittedRetriableStreams = new HashSet<>(); } } @@ -1034,6 +1023,36 @@ final class ManagedChannelImpl extends ManagedChannel implements if (maxTraceEvents > 0) { subchannelTracer = new ChannelTracer(maxTraceEvents, subchannelCreationTime, "Subchannel"); } + + final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback { + // All callbacks are run in channelExecutor + @Override + void onTerminated(InternalSubchannel is) { + subchannels.remove(is); + channelz.removeSubchannel(is); + maybeTerminateChannel(); + } + + @Override + void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { + handleInternalSubchannelState(newState); + // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. + if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) { + lb.handleSubchannelState(subchannel, newState); + } + } + + @Override + void onInUse(InternalSubchannel is) { + inUseStateAggregator.updateObjectInUse(is, true); + } + + @Override + void onNotInUse(InternalSubchannel is) { + inUseStateAggregator.updateObjectInUse(is, false); + } + } + final InternalSubchannel internalSubchannel = new InternalSubchannel( addressGroups, authority(), @@ -1043,34 +1062,7 @@ final class ManagedChannelImpl extends ManagedChannel implements transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor, - new InternalSubchannel.Callback() { - // All callbacks are run in channelExecutor - @Override - void onTerminated(InternalSubchannel is) { - subchannels.remove(is); - channelz.removeSubchannel(is); - maybeTerminateChannel(); - } - - @Override - void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { - handleInternalSubchannelState(newState); - // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. - if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) { - lb.handleSubchannelState(subchannel, newState); - } - } - - @Override - void onInUse(InternalSubchannel is) { - inUseStateAggregator.updateObjectInUse(is, true); - } - - @Override - void onNotInUse(InternalSubchannel is) { - inUseStateAggregator.updateObjectInUse(is, false); - } - }, + new ManagedInternalSubchannelCallback(), channelz, callTracerFactory.create(), subchannelTracer, @@ -1087,24 +1079,27 @@ final class ManagedChannelImpl extends ManagedChannel implements subchannel.subchannel = internalSubchannel; logger.log(Level.FINE, "[{0}] {1} created for {2}", new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroups}); - runSerialized(new Runnable() { - @Override - public void run() { - if (terminating) { - // Because runSerialized() doesn't guarantee the runnable has been executed upon when - // returning, the subchannel may still be returned to the balancer without being - // shutdown even if "terminating" is already true. The subchannel will not be used in - // this case, because delayed transport has terminated when "terminating" becomes - // true, and no more requests will be sent to balancer beyond this point. - internalSubchannel.shutdown(SHUTDOWN_STATUS); - } - if (!terminated) { - // If channel has not terminated, it will track the subchannel and block termination - // for it. - subchannels.add(internalSubchannel); - } + + final class AddSubchannel implements Runnable { + @Override + public void run() { + if (terminating) { + // Because runSerialized() doesn't guarantee the runnable has been executed upon when + // returning, the subchannel may still be returned to the balancer without being + // shutdown even if "terminating" is already true. The subchannel will not be used in + // this case, because delayed transport has terminated when "terminating" becomes + // true, and no more requests will be sent to balancer beyond this point. + internalSubchannel.shutdown(SHUTDOWN_STATUS); + } + if (!terminated) { + // If channel has not terminated, it will track the subchannel and block termination + // for it. + subchannels.add(internalSubchannel); } - }); + } + } + + runSerialized(new AddSubchannel()); return subchannel; } @@ -1113,30 +1108,30 @@ final class ManagedChannelImpl extends ManagedChannel implements final ConnectivityState newState, final SubchannelPicker newPicker) { checkNotNull(newState, "newState"); checkNotNull(newPicker, "newPicker"); - - runSerialized( - new Runnable() { - @Override - public void run() { - if (LbHelperImpl.this != lbHelper) { - return; - } - updateSubchannelPicker(newPicker); - // It's not appropriate to report SHUTDOWN state from lb. - // Ignore the case of newState == SHUTDOWN for now. - if (newState != SHUTDOWN) { - if (channelTracer != null) { - channelTracer.reportEvent( - new ChannelTrace.Event.Builder() - .setDescription("Entering " + newState + " state") - .setSeverity(ChannelTrace.Event.Severity.CT_INFO) - .setTimestampNanos(timeProvider.currentTimeNanos()) - .build()); - } - channelStateManager.gotoState(newState); - } + final class UpdateBalancingState implements Runnable { + @Override + public void run() { + if (LbHelperImpl.this != lbHelper) { + return; + } + updateSubchannelPicker(newPicker); + // It's not appropriate to report SHUTDOWN state from lb. + // Ignore the case of newState == SHUTDOWN for now. + if (newState != SHUTDOWN) { + if (channelTracer != null) { + channelTracer.reportEvent( + new ChannelTrace.Event.Builder() + .setDescription("Entering " + newState + " state") + .setSeverity(ChannelTrace.Event.Severity.CT_INFO) + .setTimestampNanos(timeProvider.currentTimeNanos()) + .build()); } - }); + channelStateManager.gotoState(newState); + } + } + } + + runSerialized(new UpdateBalancingState()); } @Override @@ -1169,26 +1164,28 @@ final class ManagedChannelImpl extends ManagedChannel implements .build()); subchannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "Subchannel"); } + final class ManagedOobChannelCallback extends InternalSubchannel.Callback { + @Override + void onTerminated(InternalSubchannel is) { + oobChannels.remove(oobChannel); + channelz.removeSubchannel(is); + oobChannel.handleSubchannelTerminated(); + maybeTerminateChannel(); + } + + @Override + void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { + handleInternalSubchannelState(newState); + oobChannel.handleSubchannelStateChange(newState); + } + } + final InternalSubchannel internalSubchannel = new InternalSubchannel( Collections.singletonList(addressGroup), authority, userAgent, backoffPolicyProvider, transportFactory, transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor, // All callback methods are run from channelExecutor - new InternalSubchannel.Callback() { - @Override - void onTerminated(InternalSubchannel is) { - oobChannels.remove(oobChannel); - channelz.removeSubchannel(is); - oobChannel.handleSubchannelTerminated(); - maybeTerminateChannel(); - } - - @Override - void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { - handleInternalSubchannelState(newState); - oobChannel.handleSubchannelStateChange(newState); - } - }, + new ManagedOobChannelCallback(), channelz, callTracerFactory.create(), subchannelTracer, @@ -1204,19 +1201,21 @@ final class ManagedChannelImpl extends ManagedChannel implements channelz.addSubchannel(oobChannel); channelz.addSubchannel(internalSubchannel); oobChannel.setSubchannel(internalSubchannel); - runSerialized(new Runnable() { - @Override - public void run() { - if (terminating) { - oobChannel.shutdown(); - } - if (!terminated) { - // If channel has not terminated, it will track the subchannel and block termination - // for it. - oobChannels.add(oobChannel); - } + final class AddOobChannel implements Runnable { + @Override + public void run() { + if (terminating) { + oobChannel.shutdown(); + } + if (!terminated) { + // If channel has not terminated, it will track the subchannel and block termination + // for it. + oobChannels.add(oobChannel); } - }); + } + } + + runSerialized(new AddOobChannel()); return oobChannel; } @@ -1325,41 +1324,40 @@ final class ManagedChannelImpl extends ManagedChannel implements .build()); haveBackends = false; } - channelExecutor - .executeLater( - new Runnable() { - @Override - public void run() { - // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. - if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { - return; - } - helper.lb.handleNameResolutionError(error); - if (nameResolverRefreshFuture != null) { - // The name resolver may invoke onError multiple times, but we only want to - // schedule one backoff attempt - // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we - // want to reset the backoff interval upon repeated onError() calls - return; - } - if (nameResolverBackoffPolicy == null) { - nameResolverBackoffPolicy = backoffPolicyProvider.get(); - } - long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos(); - if (logger.isLoggable(Level.FINE)) { - logger.log( - Level.FINE, - "[{0}] Scheduling DNS resolution backoff for {1} ns", - new Object[] {logId, delayNanos}); - } - nameResolverRefresh = new NameResolverRefresh(); - nameResolverRefreshFuture = - transportFactory - .getScheduledExecutorService() - .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS); - } - }) - .drain(); + final class NameResolverErrorHandler implements Runnable { + @Override + public void run() { + // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. + if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { + return; + } + helper.lb.handleNameResolutionError(error); + if (nameResolverRefreshFuture != null) { + // The name resolver may invoke onError multiple times, but we only want to + // schedule one backoff attempt + // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we + // want to reset the backoff interval upon repeated onError() calls + return; + } + if (nameResolverBackoffPolicy == null) { + nameResolverBackoffPolicy = backoffPolicyProvider.get(); + } + long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos(); + if (logger.isLoggable(Level.FINE)) { + logger.log( + Level.FINE, + "[{0}] Scheduling DNS resolution backoff for {1} ns", + new Object[] {logId, delayNanos}); + } + nameResolverRefresh = new NameResolverRefresh(); + nameResolverRefreshFuture = + transportFactory + .getScheduledExecutorService() + .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS); + } + } + + channelExecutor.executeLater(new NameResolverErrorHandler()).drain(); } } @@ -1419,14 +1417,16 @@ final class ManagedChannelImpl extends ManagedChannel implements // TODO(zhangkun83): consider a better approach // (https://github.com/grpc/grpc-java/issues/2562). if (!terminating) { + final class ShutdownSubchannel implements Runnable { + @Override + public void run() { + subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS); + } + } + delayedShutdownTask = transportFactory.getScheduledExecutorService().schedule( new LogExceptionRunnable( - new Runnable() { - @Override - public void run() { - subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS); - } - }), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); + new ShutdownSubchannel()), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); return; } } @@ -1463,4 +1463,62 @@ final class ManagedChannelImpl extends ManagedChannel implements .add("target", target) .toString(); } + + private final class PanicChannelExecutor extends ChannelExecutor { + @Override + void handleUncaughtThrowable(Throwable t) { + super.handleUncaughtThrowable(t); + panic(t); + } + } + + /** + * Called from channelExecutor. + */ + private final class DelayedTransportListener implements ManagedClientTransport.Listener { + @Override + public void transportShutdown(Status s) { + checkState(shutdown.get(), "Channel must have been shut down"); + } + + @Override + public void transportReady() { + // Don't care + } + + @Override + public void transportInUse(final boolean inUse) { + inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); + } + + @Override + public void transportTerminated() { + checkState(shutdown.get(), "Channel must have been shut down"); + terminating = true; + shutdownNameResolverAndLoadBalancer(false); + // No need to call channelStateManager since we are already in SHUTDOWN state. + // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them + // here. + maybeShutdownNowSubchannels(); + maybeTerminateChannel(); + } + } + + /** + * Must be accessed from channelExecutor. + */ + private final class IdleModeStateAggregator extends InUseStateAggregator<Object> { + @Override + void handleInUse() { + exitIdleMode(); + } + + @Override + void handleNotInUse() { + if (shutdown.get()) { + return; + } + rescheduleIdleTimer(); + } + } } diff --git a/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java b/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java index 48e00665c..56548b282 100644 --- a/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java +++ b/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java @@ -20,7 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import io.grpc.CallCredentials.MetadataApplier; +import io.grpc.CallCredentials2.MetadataApplier; import io.grpc.CallOptions; import io.grpc.Context; import io.grpc.Metadata; @@ -29,7 +29,7 @@ import io.grpc.Status; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; -final class MetadataApplierImpl implements MetadataApplier { +final class MetadataApplierImpl extends MetadataApplier { private final ClientTransport transport; private final MethodDescriptor<?, ?> method; private final Metadata origHeaders; diff --git a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java new file mode 100644 index 000000000..e2a9720d4 --- /dev/null +++ b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java @@ -0,0 +1,95 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import com.google.common.base.MoreObjects; +import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.EquivalentAddressGroup; +import io.grpc.ExperimentalApi; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancer; +import io.grpc.ManagedChannel; +import io.grpc.NameResolver; +import java.util.List; + +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771") +public abstract class ForwardingLoadBalancerHelper extends LoadBalancer.Helper { + /** + * Returns the underlying helper. + */ + protected abstract LoadBalancer.Helper delegate(); + + @Override + public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { + return delegate().createSubchannel(addrs, attrs); + } + + @Override + public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) { + return delegate().createSubchannel(addrs, attrs); + } + + @Override + public void updateSubchannelAddresses( + Subchannel subchannel, EquivalentAddressGroup addrs) { + delegate().updateSubchannelAddresses(subchannel, addrs); + } + + @Override + public void updateSubchannelAddresses( + Subchannel subchannel, List<EquivalentAddressGroup> addrs) { + delegate().updateSubchannelAddresses(subchannel, addrs); + } + + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + return delegate().createOobChannel(eag, authority); + } + + @Override + public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) { + delegate().updateOobChannelAddresses(channel, eag); + } + + @Override + public void updateBalancingState( + ConnectivityState newState, SubchannelPicker newPicker) { + delegate().updateBalancingState(newState, newPicker); + } + + @Override + public void runSerialized(Runnable task) { + delegate().runSerialized(task); + } + + @Override + public NameResolver.Factory getNameResolverFactory() { + return delegate().getNameResolverFactory(); + } + + @Override + public String getAuthority() { + return delegate().getAuthority(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); + } +} diff --git a/core/src/test/java/io/grpc/ForwardingTestUtil.java b/core/src/test/java/io/grpc/ForwardingTestUtil.java index ce9ca5eb7..e9f1c5b23 100644 --- a/core/src/test/java/io/grpc/ForwardingTestUtil.java +++ b/core/src/test/java/io/grpc/ForwardingTestUtil.java @@ -28,6 +28,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.Collection; +import javax.annotation.Nullable; /** * A util class to help test forwarding classes. @@ -35,9 +36,8 @@ import java.util.Collection; public final class ForwardingTestUtil { /** * Use reflection to perform a basic sanity test. The forwarding class should forward all public - * methods to the delegate, except for those in skippedMethods. - * This does NOT verify that arguments or return values are forwarded properly. It only alerts - * the developer if a forward method is missing. + * methods to the delegate, except for those in skippedMethods. This does NOT verify that + * arguments or return values are forwarded properly. * * @param delegateClass The class whose methods should be forwarded. * @param mockDelegate The mockito mock of the delegate class. @@ -49,6 +49,34 @@ public final class ForwardingTestUtil { T mockDelegate, T forwarder, Collection<Method> skippedMethods) throws Exception { + testMethodsForwarded( + delegateClass, mockDelegate, forwarder, skippedMethods, + new ArgumentProvider() { + @Override + public Object get(Class<?> clazz) { + return null; + } + }); + } + + /** + * Use reflection to perform a basic sanity test. The forwarding class should forward all public + * methods to the delegate, except for those in skippedMethods. This does NOT verify that return + * values are forwarded properly, and can only verify the propagation of arguments for which + * {@code argProvider} returns distinctive non-null values. + * + * @param delegateClass The class whose methods should be forwarded. + * @param mockDelegate The mockito mock of the delegate class. + * @param forwarder The forwarder object that forwards to the mockDelegate. + * @param skippedMethods A collection of methods that are skipped by the test. + * @param argProvider provides argument to be passed to tested forwarding methods. + */ + public static <T> void testMethodsForwarded( + Class<T> delegateClass, + T mockDelegate, + T forwarder, + Collection<Method> skippedMethods, + ArgumentProvider argProvider) throws Exception { assertTrue(mockingDetails(mockDelegate).isMock()); assertFalse(mockingDetails(forwarder).isMock()); @@ -61,7 +89,9 @@ public final class ForwardingTestUtil { Class<?>[] argTypes = method.getParameterTypes(); Object[] args = new Object[argTypes.length]; for (int i = 0; i < argTypes.length; i++) { - args[i] = Defaults.defaultValue(argTypes[i]); + if ((args[i] = argProvider.get(argTypes[i])) == null) { + args[i] = Defaults.defaultValue(argTypes[i]); + } } method.invoke(forwarder, args); try { @@ -85,4 +115,20 @@ public final class ForwardingTestUtil { assertEquals("Method toString() was not forwarded properly", expected, actual); } } + + /** + * Provides arguments for forwarded methods tested in {@link #testMethodsForwarded}. + */ + public interface ArgumentProvider { + /** + * Return an instance of the given class to be used as an argument passed to one method call. + * If one method has multiple arguments with the same type, each occurrence will call this + * method once. It is recommended that each invocation returns a distinctive object for the + * same type, in order to verify that arguments are passed by the tested class correctly. + * + * @return a value to be passed as an argument. If {@code null}, {@link Default#defaultValue} + * will be used. + */ + @Nullable Object get(Class<?> clazz); + } } diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java index 1542a6fac..71c574298 100644 --- a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java +++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java @@ -19,11 +19,14 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; import io.grpc.Attributes; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; +import io.grpc.InternalChannelz; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Subchannel; @@ -52,7 +55,8 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class AutoConfiguredLoadBalancerFactoryTest { - private final AutoConfiguredLoadBalancerFactory lbf = new AutoConfiguredLoadBalancerFactory(); + private final AutoConfiguredLoadBalancerFactory lbf = + new AutoConfiguredLoadBalancerFactory(null, null); @Test public void newLoadBalancer_isAuto() { @@ -258,6 +262,80 @@ public class AutoConfiguredLoadBalancerFactoryTest { } } + @Test + public void channelTracing_lbPolicyChanged() { + ChannelTracer channelTracer = new ChannelTracer(100, 1000, "dummy_type"); + TimeProvider timeProvider = new TimeProvider() { + @Override + public long currentTimeNanos() { + return 101; + } + }; + + InternalChannelz.ChannelStats.Builder statsBuilder + = new InternalChannelz.ChannelStats.Builder(); + channelTracer.updateBuilder(statsBuilder); + List<EquivalentAddressGroup> servers = + Collections.singletonList( + new EquivalentAddressGroup(new SocketAddress(){}, Attributes.EMPTY)); + Helper helper = new TestHelper() { + @Override + public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) { + return new TestSubchannel(addrs, attrs); + } + + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + return mock(ManagedChannel.class, RETURNS_DEEP_STUBS); + } + + @Override + public String getAuthority() { + return "fake_authority"; + } + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + // noop + } + }; + int prevNumOfEvents = statsBuilder.build().channelTrace.events.size(); + + LoadBalancer lb = + new AutoConfiguredLoadBalancerFactory(channelTracer, timeProvider).newLoadBalancer(helper); + lb.handleResolvedAddressGroups(servers, Attributes.EMPTY); + channelTracer.updateBuilder(statsBuilder); + assertThat(statsBuilder.build().channelTrace.events).hasSize(prevNumOfEvents); + + Map<String, Object> serviceConfig = new HashMap<String, Object>(); + serviceConfig.put("loadBalancingPolicy", "round_robin"); + lb.handleResolvedAddressGroups(servers, + Attributes.newBuilder() + .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build()); + channelTracer.updateBuilder(statsBuilder); + assertThat(statsBuilder.build().channelTrace.events).hasSize(prevNumOfEvents + 1); + assertThat(statsBuilder.build().channelTrace.events.get(prevNumOfEvents).description) + .isEqualTo("Load balancer changed from PickFirstBalancer to RoundRobinLoadBalancer"); + prevNumOfEvents = statsBuilder.build().channelTrace.events.size(); + + serviceConfig.put("loadBalancingPolicy", "round_robin"); + lb.handleResolvedAddressGroups(servers, + Attributes.newBuilder() + .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build()); + channelTracer.updateBuilder(statsBuilder); + assertThat(statsBuilder.build().channelTrace.events).hasSize(prevNumOfEvents); + + servers = Collections.singletonList(new EquivalentAddressGroup( + new SocketAddress(){}, + Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); + lb.handleResolvedAddressGroups(servers, Attributes.EMPTY); + + channelTracer.updateBuilder(statsBuilder); + assertThat(statsBuilder.build().channelTrace.events).hasSize(prevNumOfEvents + 1); + assertThat(statsBuilder.build().channelTrace.events.get(prevNumOfEvents).description) + .isEqualTo("Load balancer changed from RoundRobinLoadBalancer to GrpclbLoadBalancer"); + } + public static class ForwardingLoadBalancer extends LoadBalancer { private final LoadBalancer delegate; diff --git a/core/src/test/java/io/grpc/internal/CallCredentials2ApplyingTest.java b/core/src/test/java/io/grpc/internal/CallCredentials2ApplyingTest.java new file mode 100644 index 000000000..fd2b2c18f --- /dev/null +++ b/core/src/test/java/io/grpc/internal/CallCredentials2ApplyingTest.java @@ -0,0 +1,291 @@ +/* + * Copyright 2016 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.grpc.Attributes; +import io.grpc.CallCredentials.RequestInfo; +import io.grpc.CallCredentials2; +import io.grpc.CallCredentials2.MetadataApplier; +import io.grpc.CallOptions; +import io.grpc.IntegerMarshaller; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.SecurityLevel; +import io.grpc.Status; +import io.grpc.StringMarshaller; +import java.net.SocketAddress; +import java.util.concurrent.Executor; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Unit test for {@link CallCredentials2} applying functionality implemented by {@link + * CallCredentialsApplyingTransportFactory} and {@link MetadataApplierImpl}. + */ +@RunWith(JUnit4.class) +public class CallCredentials2ApplyingTest { + @Mock + private ClientTransportFactory mockTransportFactory; + + @Mock + private ConnectionClientTransport mockTransport; + + @Mock + private ClientStream mockStream; + + @Mock + private CallCredentials2 mockCreds; + + @Mock + private Executor mockExecutor; + + @Mock + private SocketAddress address; + + private static final String AUTHORITY = "testauthority"; + private static final String USER_AGENT = "testuseragent"; + private static final Attributes.Key<String> ATTR_KEY = Attributes.Key.create("somekey"); + private static final String ATTR_VALUE = "somevalue"; + private static final MethodDescriptor<String, Integer> method = + MethodDescriptor.<String, Integer>newBuilder() + .setType(MethodDescriptor.MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(new StringMarshaller()) + .setResponseMarshaller(new IntegerMarshaller()) + .build(); + private static final Metadata.Key<String> ORIG_HEADER_KEY = + Metadata.Key.of("header1", Metadata.ASCII_STRING_MARSHALLER); + private static final String ORIG_HEADER_VALUE = "some original header value"; + private static final Metadata.Key<String> CREDS_KEY = + Metadata.Key.of("test-creds", Metadata.ASCII_STRING_MARSHALLER); + private static final String CREDS_VALUE = "some credentials"; + + private final Metadata origHeaders = new Metadata(); + private ForwardingConnectionClientTransport transport; + private CallOptions callOptions; + + @Before + public void setUp() { + ClientTransportFactory.ClientTransportOptions clientTransportOptions = + new ClientTransportFactory.ClientTransportOptions() + .setAuthority(AUTHORITY) + .setUserAgent(USER_AGENT); + + MockitoAnnotations.initMocks(this); + origHeaders.put(ORIG_HEADER_KEY, ORIG_HEADER_VALUE); + when(mockTransportFactory.newClientTransport(address, clientTransportOptions)) + .thenReturn(mockTransport); + when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class))) + .thenReturn(mockStream); + ClientTransportFactory transportFactory = new CallCredentialsApplyingTransportFactory( + mockTransportFactory, mockExecutor); + transport = (ForwardingConnectionClientTransport) + transportFactory.newClientTransport(address, clientTransportOptions); + callOptions = CallOptions.DEFAULT.withCallCredentials(mockCreds); + verify(mockTransportFactory).newClientTransport(address, clientTransportOptions); + assertSame(mockTransport, transport.delegate()); + } + + @Test + public void parameterPropagation_base() { + Attributes transportAttrs = Attributes.newBuilder().set(ATTR_KEY, ATTR_VALUE).build(); + when(mockTransport.getAttributes()).thenReturn(transportAttrs); + + transport.newStream(method, origHeaders, callOptions); + + ArgumentCaptor<RequestInfo> infoCaptor = ArgumentCaptor.forClass(null); + verify(mockCreds).applyRequestMetadata( + infoCaptor.capture(), same(mockExecutor), any(MetadataApplier.class)); + RequestInfo info = infoCaptor.getValue(); + assertSame(method, info.getMethodDescriptor()); + Attributes attrs = info.getTransportAttrs(); + assertSame(ATTR_VALUE, info.getTransportAttrs().get(ATTR_KEY)); + assertSame(AUTHORITY, info.getAuthority()); + assertSame(SecurityLevel.NONE, info.getSecurityLevel()); + } + + @Test + public void parameterPropagation_transportSetSecurityLevel() { + Attributes transportAttrs = Attributes.newBuilder() + .set(ATTR_KEY, ATTR_VALUE) + .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.INTEGRITY) + .build(); + when(mockTransport.getAttributes()).thenReturn(transportAttrs); + + transport.newStream(method, origHeaders, callOptions); + + ArgumentCaptor<RequestInfo> infoCaptor = ArgumentCaptor.forClass(null); + verify(mockCreds).applyRequestMetadata( + infoCaptor.capture(), same(mockExecutor), any(MetadataApplier.class)); + RequestInfo info = infoCaptor.getValue(); + assertSame(method, info.getMethodDescriptor()); + assertSame(ATTR_VALUE, info.getTransportAttrs().get(ATTR_KEY)); + assertSame(AUTHORITY, info.getAuthority()); + assertSame(SecurityLevel.INTEGRITY, info.getSecurityLevel()); + } + + @Test + public void parameterPropagation_callOptionsSetAuthority() { + Attributes transportAttrs = Attributes.newBuilder() + .set(ATTR_KEY, ATTR_VALUE) + .build(); + when(mockTransport.getAttributes()).thenReturn(transportAttrs); + Executor anotherExecutor = mock(Executor.class); + + transport.newStream(method, origHeaders, + callOptions.withAuthority("calloptions-authority").withExecutor(anotherExecutor)); + + ArgumentCaptor<RequestInfo> infoCaptor = ArgumentCaptor.forClass(null); + verify(mockCreds).applyRequestMetadata( + infoCaptor.capture(), same(anotherExecutor), any(MetadataApplier.class)); + RequestInfo info = infoCaptor.getValue(); + assertSame(method, info.getMethodDescriptor()); + assertSame(ATTR_VALUE, info.getTransportAttrs().get(ATTR_KEY)); + assertEquals("calloptions-authority", info.getAuthority()); + assertSame(SecurityLevel.NONE, info.getSecurityLevel()); + } + + @Test + public void credentialThrows() { + final RuntimeException ex = new RuntimeException(); + when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY); + doThrow(ex).when(mockCreds).applyRequestMetadata( + any(RequestInfo.class), same(mockExecutor), any(MetadataApplier.class)); + + FailingClientStream stream = + (FailingClientStream) transport.newStream(method, origHeaders, callOptions); + + verify(mockTransport, never()).newStream(method, origHeaders, callOptions); + assertEquals(Status.Code.UNAUTHENTICATED, stream.getError().getCode()); + assertSame(ex, stream.getError().getCause()); + } + + @Test + public void applyMetadata_inline() { + when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + MetadataApplier applier = (MetadataApplier) invocation.getArguments()[2]; + Metadata headers = new Metadata(); + headers.put(CREDS_KEY, CREDS_VALUE); + applier.apply(headers); + return null; + } + }).when(mockCreds).applyRequestMetadata( + any(RequestInfo.class), same(mockExecutor), any(MetadataApplier.class)); + + ClientStream stream = transport.newStream(method, origHeaders, callOptions); + + verify(mockTransport).newStream(method, origHeaders, callOptions); + assertSame(mockStream, stream); + assertEquals(CREDS_VALUE, origHeaders.get(CREDS_KEY)); + assertEquals(ORIG_HEADER_VALUE, origHeaders.get(ORIG_HEADER_KEY)); + } + + @Test + public void fail_inline() { + final Status error = Status.FAILED_PRECONDITION.withDescription("channel not secure for creds"); + when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + MetadataApplier applier = (MetadataApplier) invocation.getArguments()[2]; + applier.fail(error); + return null; + } + }).when(mockCreds).applyRequestMetadata( + any(RequestInfo.class), same(mockExecutor), any(MetadataApplier.class)); + + FailingClientStream stream = + (FailingClientStream) transport.newStream(method, origHeaders, callOptions); + + verify(mockTransport, never()).newStream(method, origHeaders, callOptions); + assertSame(error, stream.getError()); + } + + @Test + public void applyMetadata_delayed() { + when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY); + + // Will call applyRequestMetadata(), which is no-op. + DelayedStream stream = (DelayedStream) transport.newStream(method, origHeaders, callOptions); + + ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null); + verify(mockCreds).applyRequestMetadata( + any(RequestInfo.class), same(mockExecutor), applierCaptor.capture()); + verify(mockTransport, never()).newStream(method, origHeaders, callOptions); + + Metadata headers = new Metadata(); + headers.put(CREDS_KEY, CREDS_VALUE); + applierCaptor.getValue().apply(headers); + + verify(mockTransport).newStream(method, origHeaders, callOptions); + assertSame(mockStream, stream.getRealStream()); + assertEquals(CREDS_VALUE, origHeaders.get(CREDS_KEY)); + assertEquals(ORIG_HEADER_VALUE, origHeaders.get(ORIG_HEADER_KEY)); + } + + @Test + public void fail_delayed() { + when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY); + + // Will call applyRequestMetadata(), which is no-op. + DelayedStream stream = (DelayedStream) transport.newStream(method, origHeaders, callOptions); + + ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null); + verify(mockCreds).applyRequestMetadata( + any(RequestInfo.class), same(mockExecutor), applierCaptor.capture()); + + Status error = Status.FAILED_PRECONDITION.withDescription("channel not secure for creds"); + applierCaptor.getValue().fail(error); + + verify(mockTransport, never()).newStream(method, origHeaders, callOptions); + FailingClientStream failingStream = (FailingClientStream) stream.getRealStream(); + assertSame(error, failingStream.getError()); + } + + @Test + public void noCreds() { + callOptions = callOptions.withCallCredentials(null); + ClientStream stream = transport.newStream(method, origHeaders, callOptions); + + verify(mockTransport).newStream(method, origHeaders, callOptions); + assertSame(mockStream, stream); + assertNull(origHeaders.get(CREDS_KEY)); + assertEquals(ORIG_HEADER_VALUE, origHeaders.get(ORIG_HEADER_KEY)); + } +} diff --git a/core/src/test/java/io/grpc/internal/CallCredentialsApplyingTest.java b/core/src/test/java/io/grpc/internal/CallCredentialsApplyingTest.java index c6a9bfb43..a8ca1219b 100644 --- a/core/src/test/java/io/grpc/internal/CallCredentialsApplyingTest.java +++ b/core/src/test/java/io/grpc/internal/CallCredentialsApplyingTest.java @@ -30,7 +30,6 @@ import static org.mockito.Mockito.when; import io.grpc.Attributes; import io.grpc.CallCredentials; -import io.grpc.CallCredentials.MetadataApplier; import io.grpc.CallOptions; import io.grpc.IntegerMarshaller; import io.grpc.Metadata; @@ -55,6 +54,7 @@ import org.mockito.stubbing.Answer; * CallCredentialsApplyingTransportFactory} and {@link MetadataApplierImpl}. */ @RunWith(JUnit4.class) +@Deprecated public class CallCredentialsApplyingTest { @Mock private ClientTransportFactory mockTransportFactory; @@ -127,7 +127,7 @@ public class CallCredentialsApplyingTest { ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(null); verify(mockCreds).applyRequestMetadata(same(method), attrsCaptor.capture(), same(mockExecutor), - any(MetadataApplier.class)); + any(CallCredentials.MetadataApplier.class)); Attributes attrs = attrsCaptor.getValue(); assertSame(ATTR_VALUE, attrs.get(ATTR_KEY)); assertSame(AUTHORITY, attrs.get(CallCredentials.ATTR_AUTHORITY)); @@ -147,7 +147,7 @@ public class CallCredentialsApplyingTest { ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(null); verify(mockCreds).applyRequestMetadata(same(method), attrsCaptor.capture(), same(mockExecutor), - any(MetadataApplier.class)); + any(CallCredentials.MetadataApplier.class)); Attributes attrs = attrsCaptor.getValue(); assertSame(ATTR_VALUE, attrs.get(ATTR_KEY)); assertEquals("transport-override-authority", attrs.get(CallCredentials.ATTR_AUTHORITY)); @@ -169,7 +169,7 @@ public class CallCredentialsApplyingTest { ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(null); verify(mockCreds).applyRequestMetadata(same(method), attrsCaptor.capture(), - same(anotherExecutor), any(MetadataApplier.class)); + same(anotherExecutor), any(CallCredentials.MetadataApplier.class)); Attributes attrs = attrsCaptor.getValue(); assertSame(ATTR_VALUE, attrs.get(ATTR_KEY)); assertEquals("calloptions-authority", attrs.get(CallCredentials.ATTR_AUTHORITY)); @@ -181,7 +181,8 @@ public class CallCredentialsApplyingTest { final RuntimeException ex = new RuntimeException(); when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY); doThrow(ex).when(mockCreds).applyRequestMetadata( - same(method), any(Attributes.class), same(mockExecutor), any(MetadataApplier.class)); + same(method), any(Attributes.class), same(mockExecutor), + any(CallCredentials.MetadataApplier.class)); FailingClientStream stream = (FailingClientStream) transport.newStream(method, origHeaders, callOptions); @@ -197,14 +198,15 @@ public class CallCredentialsApplyingTest { doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - MetadataApplier applier = (MetadataApplier) invocation.getArguments()[3]; + CallCredentials.MetadataApplier applier = + (CallCredentials.MetadataApplier) invocation.getArguments()[3]; Metadata headers = new Metadata(); headers.put(CREDS_KEY, CREDS_VALUE); applier.apply(headers); return null; } }).when(mockCreds).applyRequestMetadata(same(method), any(Attributes.class), - same(mockExecutor), any(MetadataApplier.class)); + same(mockExecutor), any(CallCredentials.MetadataApplier.class)); ClientStream stream = transport.newStream(method, origHeaders, callOptions); @@ -221,12 +223,13 @@ public class CallCredentialsApplyingTest { doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - MetadataApplier applier = (MetadataApplier) invocation.getArguments()[3]; + CallCredentials.MetadataApplier applier = + (CallCredentials.MetadataApplier) invocation.getArguments()[3]; applier.fail(error); return null; } }).when(mockCreds).applyRequestMetadata(same(method), any(Attributes.class), - same(mockExecutor), any(MetadataApplier.class)); + same(mockExecutor), any(CallCredentials.MetadataApplier.class)); FailingClientStream stream = (FailingClientStream) transport.newStream(method, origHeaders, callOptions); @@ -242,7 +245,7 @@ public class CallCredentialsApplyingTest { // Will call applyRequestMetadata(), which is no-op. DelayedStream stream = (DelayedStream) transport.newStream(method, origHeaders, callOptions); - ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null); + ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null); verify(mockCreds).applyRequestMetadata(same(method), any(Attributes.class), same(mockExecutor), applierCaptor.capture()); verify(mockTransport, never()).newStream(method, origHeaders, callOptions); @@ -264,7 +267,7 @@ public class CallCredentialsApplyingTest { // Will call applyRequestMetadata(), which is no-op. DelayedStream stream = (DelayedStream) transport.newStream(method, origHeaders, callOptions); - ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null); + ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null); verify(mockCreds).applyRequestMetadata(same(method), any(Attributes.class), same(mockExecutor), applierCaptor.capture()); diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java index 44bde2222..3f7adfa57 100644 --- a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java +++ b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java @@ -18,6 +18,7 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -27,13 +28,10 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.google.common.base.Stopwatch; import com.google.common.collect.Iterables; import com.google.common.net.InetAddresses; -import com.google.common.testing.FakeTicker; import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; import io.grpc.NameResolver; @@ -42,6 +40,7 @@ import io.grpc.internal.DnsNameResolver.ResolutionResults; import io.grpc.internal.DnsNameResolver.ResourceResolver; import io.grpc.internal.DnsNameResolver.ResourceResolverFactory; import io.grpc.internal.SharedResourceHolder.Resource; +import java.io.IOException; import java.net.Inet4Address; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -55,8 +54,6 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -108,25 +105,21 @@ public class DnsNameResolverTest { private NameResolver.Listener mockListener; @Captor private ArgumentCaptor<List<EquivalentAddressGroup>> resultCaptor; - @Nullable - private String networkaddressCacheTtlPropertyValue; private DnsNameResolver newResolver(String name, int port) { - return newResolver(name, port, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted()); + return newResolver(name, port, GrpcUtil.NOOP_PROXY_DETECTOR); } private DnsNameResolver newResolver( String name, int port, - ProxyDetector proxyDetector, - Stopwatch stopwatch) { + ProxyDetector proxyDetector) { DnsNameResolver dnsResolver = new DnsNameResolver( null, name, Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, port).build(), fakeExecutorResource, - proxyDetector, - stopwatch); + proxyDetector); return dnsResolver; } @@ -134,19 +127,6 @@ public class DnsNameResolverTest { public void setUp() { MockitoAnnotations.initMocks(this); DnsNameResolver.enableJndi = true; - networkaddressCacheTtlPropertyValue = - System.getProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY); - } - - @After - public void restoreSystemProperty() { - if (networkaddressCacheTtlPropertyValue == null) { - System.clearProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY); - } else { - System.setProperty( - DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, - networkaddressCacheTtlPropertyValue); - } } @After @@ -198,8 +178,7 @@ public class DnsNameResolverTest { } @Test - public void resolve_neverCache() throws Exception { - System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, "0"); + public void resolve() throws Exception { final List<InetAddress> answer1 = createAddressList(2); final List<InetAddress> answer2 = createAddressList(1); String name = "foo.googleapis.com"; @@ -227,156 +206,6 @@ public class DnsNameResolverTest { } @Test - public void resolve_cacheForever() throws Exception { - System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, "-1"); - final List<InetAddress> answer1 = createAddressList(2); - String name = "foo.googleapis.com"; - FakeTicker fakeTicker = new FakeTicker(); - - DnsNameResolver resolver = - newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker)); - AddressResolver mockResolver = mock(AddressResolver.class); - when(mockResolver.resolveAddress(Matchers.anyString())) - .thenReturn(answer1) - .thenThrow(new AssertionError("should not called twice")); - resolver.setAddressResolver(mockResolver); - - resolver.start(mockListener); - assertEquals(1, fakeExecutor.runDueTasks()); - verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class)); - assertAnswerMatches(answer1, 81, resultCaptor.getValue()); - assertEquals(0, fakeClock.numPendingTasks()); - - fakeTicker.advance(1, TimeUnit.DAYS); - resolver.refresh(); - assertEquals(1, fakeExecutor.runDueTasks()); - verifyNoMoreInteractions(mockListener); - assertAnswerMatches(answer1, 81, resultCaptor.getValue()); - assertEquals(0, fakeClock.numPendingTasks()); - - resolver.shutdown(); - - verify(mockResolver).resolveAddress(Matchers.anyString()); - } - - @Test - public void resolve_usingCache() throws Exception { - long ttl = 60; - System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, Long.toString(ttl)); - final List<InetAddress> answer = createAddressList(2); - String name = "foo.googleapis.com"; - FakeTicker fakeTicker = new FakeTicker(); - - DnsNameResolver resolver = - newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker)); - AddressResolver mockResolver = mock(AddressResolver.class); - when(mockResolver.resolveAddress(Matchers.anyString())) - .thenReturn(answer) - .thenThrow(new AssertionError("should not reach here.")); - resolver.setAddressResolver(mockResolver); - - resolver.start(mockListener); - assertEquals(1, fakeExecutor.runDueTasks()); - verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class)); - assertAnswerMatches(answer, 81, resultCaptor.getValue()); - assertEquals(0, fakeClock.numPendingTasks()); - - // this refresh should return cached result - fakeTicker.advance(ttl - 1, TimeUnit.SECONDS); - resolver.refresh(); - assertEquals(1, fakeExecutor.runDueTasks()); - verifyNoMoreInteractions(mockListener); - assertAnswerMatches(answer, 81, resultCaptor.getValue()); - assertEquals(0, fakeClock.numPendingTasks()); - - resolver.shutdown(); - - verify(mockResolver).resolveAddress(Matchers.anyString()); - } - - @Test - public void resolve_cacheExpired() throws Exception { - long ttl = 60; - System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, Long.toString(ttl)); - final List<InetAddress> answer1 = createAddressList(2); - final List<InetAddress> answer2 = createAddressList(1); - String name = "foo.googleapis.com"; - FakeTicker fakeTicker = new FakeTicker(); - - DnsNameResolver resolver = - newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker)); - AddressResolver mockResolver = mock(AddressResolver.class); - when(mockResolver.resolveAddress(Matchers.anyString())).thenReturn(answer1).thenReturn(answer2); - resolver.setAddressResolver(mockResolver); - - resolver.start(mockListener); - assertEquals(1, fakeExecutor.runDueTasks()); - verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class)); - assertAnswerMatches(answer1, 81, resultCaptor.getValue()); - assertEquals(0, fakeClock.numPendingTasks()); - - fakeTicker.advance(ttl + 1, TimeUnit.SECONDS); - resolver.refresh(); - assertEquals(1, fakeExecutor.runDueTasks()); - verify(mockListener, times(2)).onAddresses(resultCaptor.capture(), any(Attributes.class)); - assertAnswerMatches(answer2, 81, resultCaptor.getValue()); - assertEquals(0, fakeClock.numPendingTasks()); - - resolver.shutdown(); - - verify(mockResolver, times(2)).resolveAddress(Matchers.anyString()); - } - - @Test - public void resolve_invalidTtlPropertyValue() throws Exception { - System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, "not_a_number"); - resolveDefaultValue(); - } - - @Test - public void resolve_noPropertyValue() throws Exception { - System.clearProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY); - resolveDefaultValue(); - } - - private void resolveDefaultValue() throws Exception { - final List<InetAddress> answer1 = createAddressList(2); - final List<InetAddress> answer2 = createAddressList(1); - String name = "foo.googleapis.com"; - FakeTicker fakeTicker = new FakeTicker(); - - DnsNameResolver resolver = - newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker)); - AddressResolver mockResolver = mock(AddressResolver.class); - when(mockResolver.resolveAddress(Matchers.anyString())).thenReturn(answer1).thenReturn(answer2); - resolver.setAddressResolver(mockResolver); - - resolver.start(mockListener); - assertEquals(1, fakeExecutor.runDueTasks()); - verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class)); - assertAnswerMatches(answer1, 81, resultCaptor.getValue()); - assertEquals(0, fakeClock.numPendingTasks()); - - fakeTicker.advance(DnsNameResolver.DEFAULT_NETWORK_CACHE_TTL_SECONDS, TimeUnit.SECONDS); - resolver.refresh(); - assertEquals(1, fakeExecutor.runDueTasks()); - verifyNoMoreInteractions(mockListener); - assertAnswerMatches(answer1, 81, resultCaptor.getValue()); - assertEquals(0, fakeClock.numPendingTasks()); - - fakeTicker.advance(1, TimeUnit.SECONDS); - resolver.refresh(); - assertEquals(1, fakeExecutor.runDueTasks()); - verify(mockListener, times(2)).onAddresses(resultCaptor.capture(), any(Attributes.class)); - assertAnswerMatches(answer2, 81, resultCaptor.getValue()); - assertEquals(0, fakeClock.numPendingTasks()); - - resolver.shutdown(); - - verify(mockResolver, times(2)).resolveAddress(Matchers.anyString()); - } - - @Test public void resolveAll_nullResourceResolver() throws Exception { final String hostname = "addr.fake"; final Inet4Address backendAddr = InetAddresses.fromInteger(0x7f000001); @@ -397,6 +226,23 @@ public class DnsNameResolverTest { } @Test + public void resolveAll_nullResourceResolver_addressFailure() throws Exception { + final String hostname = "addr.fake"; + + AddressResolver mockResolver = mock(AddressResolver.class); + when(mockResolver.resolveAddress(Matchers.anyString())) + .thenThrow(new IOException("no addr")); + ResourceResolver resourceResolver = null; + boolean resovleSrv = true; + boolean resolveTxt = true; + + thrown.expect(RuntimeException.class); + thrown.expectMessage("no addr"); + + DnsNameResolver.resolveAll(mockResolver, resourceResolver, resovleSrv, resolveTxt, hostname); + } + + @Test public void resolveAll_presentResourceResolver() throws Exception { final String hostname = "addr.fake"; final Inet4Address backendAddr = InetAddresses.fromInteger(0x7f000001); @@ -502,8 +348,7 @@ public class DnsNameResolverTest { "password"); when(alwaysDetectProxy.proxyFor(any(SocketAddress.class))) .thenReturn(proxyParameters); - DnsNameResolver resolver = - newResolver(name, port, alwaysDetectProxy, Stopwatch.createUnstarted()); + DnsNameResolver resolver = newResolver(name, port, alwaysDetectProxy); AddressResolver mockAddressResolver = mock(AddressResolver.class); when(mockAddressResolver.resolveAddress(Matchers.anyString())).thenThrow(new AssertionError()); resolver.setAddressResolver(mockAddressResolver); @@ -754,6 +599,82 @@ public class DnsNameResolverTest { assertNotNull(DnsNameResolver.maybeChooseServiceConfig(choice, new Random(), "localhost")); } + @Test + public void shouldUseJndi_alwaysFalseIfDisabled() { + boolean enableJndi = false; + boolean enableJndiLocalhost = true; + String host = "seemingly.valid.host"; + + assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, host)); + } + + @Test + public void shouldUseJndi_falseIfDisabledForLocalhost() { + boolean enableJndi = true; + boolean enableJndiLocalhost = false; + + assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "localhost")); + assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "LOCALHOST")); + } + + @Test + public void shouldUseJndi_trueIfLocalhostOverriden() { + boolean enableJndi = true; + boolean enableJndiLocalhost = true; + String host = "localhost"; + + assertTrue(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, host)); + } + + @Test + public void shouldUseJndi_falseForIpv6() { + boolean enableJndi = true; + boolean enableJndiLocalhost = false; + + assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "::")); + assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "::1")); + assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "2001:db8:1234::")); + assertFalse(DnsNameResolver.shouldUseJndi( + enableJndi, enableJndiLocalhost, "[2001:db8:1234::]")); + assertFalse(DnsNameResolver.shouldUseJndi( + enableJndi, enableJndiLocalhost, "2001:db8:1234::%3")); + } + + @Test + public void shouldUseJndi_falseForIpv4() { + boolean enableJndi = true; + boolean enableJndiLocalhost = false; + + assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "127.0.0.1")); + assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "192.168.0.1")); + assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "134744072")); + } + + @Test + public void shouldUseJndi_falseForEmpty() { + boolean enableJndi = true; + boolean enableJndiLocalhost = false; + + assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "")); + } + + @Test + public void shouldUseJndi_trueIfItMightPossiblyBeValid() { + boolean enableJndi = true; + boolean enableJndiLocalhost = false; + + assertTrue(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "remotehost")); + assertTrue(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "remotehost.gov")); + assertTrue(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "f.q.d.n.")); + assertTrue(DnsNameResolver.shouldUseJndi( + enableJndi, enableJndiLocalhost, "8.8.8.8.in-addr.arpa.")); + assertTrue(DnsNameResolver.shouldUseJndi( + enableJndi, enableJndiLocalhost, "2001-db8-1234--as3.ipv6-literal.net")); + + + + } + private void testInvalidUri(URI uri) { try { provider.newNameResolver(uri, NAME_RESOLVER_PARAMS); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index ffc92b1bf..c5fdf713e 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -57,7 +57,6 @@ import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; import io.grpc.BinaryLog; import io.grpc.CallCredentials; -import io.grpc.CallCredentials.MetadataApplier; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -1388,6 +1387,7 @@ public class ManagedChannelImplTest { * propagated to newStream() and applyRequestMetadata(). */ @Test + @SuppressWarnings("deprecation") public void informationPropagatedToNewStreamAndCallCredentials() { createChannel(); CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds); @@ -1401,9 +1401,9 @@ public class ManagedChannelImplTest { credsApplyContexts.add(Context.current()); return null; } - }).when(creds).applyRequestMetadata( + }).when(creds).applyRequestMetadata( // TODO(zhangkun83): remove suppression of deprecations any(MethodDescriptor.class), any(Attributes.class), any(Executor.class), - any(MetadataApplier.class)); + any(CallCredentials.MetadataApplier.class)); // First call will be on delayed transport. Only newCall() is run within the expected context, // so that we can verify that the context is explicitly attached before calling newStream() and @@ -1435,7 +1435,7 @@ public class ManagedChannelImplTest { verify(creds, never()).applyRequestMetadata( any(MethodDescriptor.class), any(Attributes.class), any(Executor.class), - any(MetadataApplier.class)); + any(CallCredentials.MetadataApplier.class)); // applyRequestMetadata() is called after the transport becomes ready. transportInfo.listener.transportReady(); @@ -1444,7 +1444,8 @@ public class ManagedChannelImplTest { helper.updateBalancingState(READY, mockPicker); executor.runDueTasks(); ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(Attributes.class); - ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(MetadataApplier.class); + ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor + = ArgumentCaptor.forClass(CallCredentials.MetadataApplier.class); verify(creds).applyRequestMetadata(same(method), attrsCaptor.capture(), same(executor.getScheduledExecutorService()), applierCaptor.capture()); assertEquals("testValue", testKey.get(credsApplyContexts.poll())); @@ -2761,7 +2762,7 @@ public class ManagedChannelImplTest { ManagedChannel mychannel = new CustomBuilder() .nameResolverFactory(factory) - .loadBalancerFactory(new AutoConfiguredLoadBalancerFactory()).build(); + .loadBalancerFactory(new AutoConfiguredLoadBalancerFactory(null, null)).build(); ClientCall<Void, Void> call1 = mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT); @@ -2801,6 +2802,14 @@ public class ManagedChannelImplTest { mychannel.shutdownNow(); } + @Test + public void getAuthorityAfterShutdown() throws Exception { + createChannel(); + assertEquals(SERVICE_NAME, channel.authority()); + channel.shutdownNow().awaitTermination(1, TimeUnit.SECONDS); + assertEquals(SERVICE_NAME, channel.authority()); + } + private static final class ChannelBuilder extends AbstractManagedChannelImplBuilder<ChannelBuilder> { diff --git a/core/src/test/java/io/grpc/util/ForwardingLoadBalancerHelperTest.java b/core/src/test/java/io/grpc/util/ForwardingLoadBalancerHelperTest.java new file mode 100644 index 000000000..ebc883b9a --- /dev/null +++ b/core/src/test/java/io/grpc/util/ForwardingLoadBalancerHelperTest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static org.mockito.Mockito.mock; + +import io.grpc.EquivalentAddressGroup; +import io.grpc.ForwardingTestUtil; +import io.grpc.LoadBalancer; +import java.lang.reflect.Method; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link ForwardingLoadBalancerHelper}. */ +@RunWith(JUnit4.class) +public class ForwardingLoadBalancerHelperTest { + private final LoadBalancer.Helper mockDelegate = mock(LoadBalancer.Helper.class); + + private final class TestHelper extends ForwardingLoadBalancerHelper { + @Override + protected LoadBalancer.Helper delegate() { + return mockDelegate; + } + } + + @Test + public void allMethodsForwarded() throws Exception { + final SocketAddress mockAddr = mock(SocketAddress.class); + ForwardingTestUtil.testMethodsForwarded( + LoadBalancer.Helper.class, + mockDelegate, + new TestHelper(), + Collections.<Method>emptyList(), + new ForwardingTestUtil.ArgumentProvider() { + @Override + public Object get(Class<?> clazz) { + if (clazz.equals(EquivalentAddressGroup.class)) { + return new EquivalentAddressGroup(Arrays.asList(mockAddr)); + } else if (clazz.equals(List.class)) { + return Collections.<Object>emptyList(); + } + return null; + } + }); + } +} |