aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHadrien Zalek <hzalek@google.com>2020-07-24 12:30:10 -0700
committerHadrien Zalek <hzalek@google.com>2020-07-24 23:35:41 +0000
commit9b4675b8aba18f88cdb08004aac053ce16c4968a (patch)
tree900389d86d6c0d165b93722f60de46164ecc6255 /core
parent332041b0592239d4bfe59bfd316f28c02f523570 (diff)
parent57043233bf5aecce92f0c6629b6ac46d9393ce8c (diff)
downloadgrpc-grpc-java-9b4675b8aba18f88cdb08004aac053ce16c4968a.tar.gz
Merge tag 'upstream/v1.16.1' into HEAD
Update the Java gRPC implementation source to that of a released version (v1.16.1) instead of some intermediate commit after v1.15.0. Test: m grpc-java Bug: 148404241 Change-Id: I9c072aee054a4aecc1bdf39adf45e9a243b907f5
Diffstat (limited to 'core')
-rw-r--r--core/build.gradle13
-rw-r--r--core/src/main/java/io/grpc/Attributes.java12
-rw-r--r--core/src/main/java/io/grpc/BinaryLog.java17
-rw-r--r--core/src/main/java/io/grpc/CallCredentials.java40
-rw-r--r--core/src/main/java/io/grpc/CallCredentials2.java99
-rw-r--r--core/src/main/java/io/grpc/ClientCall.java3
-rw-r--r--core/src/main/java/io/grpc/EquivalentAddressGroup.java17
-rw-r--r--core/src/main/java/io/grpc/Grpc.java26
-rw-r--r--core/src/main/java/io/grpc/LoadBalancer.java10
-rw-r--r--core/src/main/java/io/grpc/NameResolver.java18
-rw-r--r--core/src/main/java/io/grpc/ServerCall.java2
-rw-r--r--core/src/main/java/io/grpc/inprocess/InProcessTransport.java5
-rw-r--r--core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java37
-rw-r--r--core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java1
-rw-r--r--core/src/main/java/io/grpc/internal/DnsNameResolver.java87
-rw-r--r--core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java4
-rw-r--r--core/src/main/java/io/grpc/internal/GrpcAttributes.java16
-rw-r--r--core/src/main/java/io/grpc/internal/GrpcUtil.java2
-rw-r--r--core/src/main/java/io/grpc/internal/ManagedChannelImpl.java618
-rw-r--r--core/src/main/java/io/grpc/internal/MetadataApplierImpl.java4
-rw-r--r--core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java95
-rw-r--r--core/src/test/java/io/grpc/ForwardingTestUtil.java54
-rw-r--r--core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java80
-rw-r--r--core/src/test/java/io/grpc/internal/CallCredentials2ApplyingTest.java291
-rw-r--r--core/src/test/java/io/grpc/internal/CallCredentialsApplyingTest.java25
-rw-r--r--core/src/test/java/io/grpc/internal/DnsNameResolverTest.java279
-rw-r--r--core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java21
-rw-r--r--core/src/test/java/io/grpc/util/ForwardingLoadBalancerHelperTest.java65
28 files changed, 1365 insertions, 576 deletions
diff --git a/core/build.gradle b/core/build.gradle
index 700592f86..4a2429098 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -3,12 +3,19 @@ description = 'gRPC: Core'
dependencies {
compile project(':grpc-context'),
libraries.gson,
- libraries.guava,
libraries.errorprone,
libraries.jsr305,
libraries.animalsniffer_annotations
+ compile (libraries.guava) {
+ // prefer 2.2.0 from libraries instead of 2.1.3
+ exclude group: 'com.google.errorprone', module: 'error_prone_annotations'
+ // prefer 3.0.2 from libraries instead of 3.0.1
+ exclude group: 'com.google.code.findbugs', module: 'jsr305'
+ // prefer 1.17 from libraries instead of 1.14
+ exclude group: 'org.codehaus.mojo', module: 'animal-sniffer-annotations'
+ }
compile (libraries.opencensus_api) {
- // prefer 3.0.0 from libraries instead of 3.0.1
+ // prefer 3.0.2 from libraries instead of 3.0.1
exclude group: 'com.google.code.findbugs', module: 'jsr305'
// prefer 20.0 from libraries instead of 19.0
exclude group: 'com.google.guava', module: 'guava'
@@ -16,7 +23,7 @@ dependencies {
exclude group: 'io.grpc', module: 'grpc-context'
}
compile (libraries.opencensus_contrib_grpc_metrics) {
- // prefer 3.0.0 from libraries instead of 3.0.1
+ // prefer 3.0.2 from libraries instead of 3.0.1
exclude group: 'com.google.code.findbugs', module: 'jsr305'
// we'll always be more up-to-date
exclude group: 'io.grpc', module: 'grpc-context'
diff --git a/core/src/main/java/io/grpc/Attributes.java b/core/src/main/java/io/grpc/Attributes.java
index 9a13f5160..411d11c33 100644
--- a/core/src/main/java/io/grpc/Attributes.java
+++ b/core/src/main/java/io/grpc/Attributes.java
@@ -29,6 +29,18 @@ import javax.annotation.concurrent.Immutable;
/**
* An immutable type-safe container of attributes.
+ *
+ * <h3>Annotation semantics</h3>
+ *
+ * <p>As a convention, annotations such as {@link Grpc.TransportAttr} is defined to associate
+ * attribute {@link Key}s and their propagation paths. The annotation may be applied to a {@code
+ * Key} definition field, a method that returns {@link Attributes}, or a variable of type {@link
+ * Attributes}, to indicate that the annotated {@link Attributes} objects may contain the annotated
+ * {@code Key}.
+ *
+ * <p>Javadoc users may click "USE" on the navigation bars of the annotation's javadoc page to view
+ * references of such annotation.
+ *
* @since 1.13.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1764")
diff --git a/core/src/main/java/io/grpc/BinaryLog.java b/core/src/main/java/io/grpc/BinaryLog.java
index 5c2d4ab2f..9d11b63cf 100644
--- a/core/src/main/java/io/grpc/BinaryLog.java
+++ b/core/src/main/java/io/grpc/BinaryLog.java
@@ -29,21 +29,4 @@ public abstract class BinaryLog implements Closeable {
ServerMethodDefinition<ReqT, RespT> oMethodDef);
public abstract Channel wrapChannel(Channel channel);
-
- /**
- * A CallId is two byte[] arrays both of size 8 that uniquely identifies the RPC. Users are
- * free to use the byte arrays however they see fit.
- */
- public static final class CallId {
- public final long hi;
- public final long lo;
-
- /**
- * Creates an instance.
- */
- public CallId(long hi, long lo) {
- this.hi = hi;
- this.lo = lo;
- }
- }
}
diff --git a/core/src/main/java/io/grpc/CallCredentials.java b/core/src/main/java/io/grpc/CallCredentials.java
index 7ce621993..03be65310 100644
--- a/core/src/main/java/io/grpc/CallCredentials.java
+++ b/core/src/main/java/io/grpc/CallCredentials.java
@@ -40,10 +40,15 @@ public interface CallCredentials {
* The security level of the transport. It is guaranteed to be present in the {@code attrs} passed
* to {@link #applyRequestMetadata}. It is by default {@link SecurityLevel#NONE} but can be
* overridden by the transport.
+ *
+ * @deprecated transport implementations should use {@code
+ * io.grpc.internal.GrpcAttributes.ATTR_SECURITY_LEVEL} instead.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914")
+ @Grpc.TransportAttr
+ @Deprecated
public static final Key<SecurityLevel> ATTR_SECURITY_LEVEL =
- Key.create("io.grpc.CallCredentials.securityLevel");
+ Key.create("io.grpc.internal.GrpcAttributes.securityLevel");
/**
* The authority string used to authenticate the server. Usually it's the server's host name. It
@@ -52,6 +57,8 @@ public interface CallCredentials {
* io.grpc.CallOptions} with increasing precedence.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914")
+ @Grpc.TransportAttr
+ @Deprecated
public static final Key<String> ATTR_AUTHORITY = Key.create("io.grpc.CallCredentials.authority");
/**
@@ -71,8 +78,11 @@ public interface CallCredentials {
* needs to perform blocking operations.
* @param applier The outlet of the produced headers. It can be called either before or after this
* method returns.
+ *
+ * @deprecated implement {@link CallCredentials2} instead.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914")
+ @Deprecated
void applyRequestMetadata(
MethodDescriptor<?, ?> method, Attributes attrs,
Executor appExecutor, MetadataApplier applier);
@@ -89,6 +99,7 @@ public interface CallCredentials {
*
* <p>Exactly one of its methods must be called to make the RPC proceed.
*/
+ @Deprecated
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914")
public interface MetadataApplier {
/**
@@ -102,4 +113,31 @@ public interface CallCredentials {
*/
void fail(Status status);
}
+
+ /**
+ * The request-related information passed to {@code CallCredentials2.applyRequestMetadata()}.
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914")
+ public abstract static class RequestInfo {
+ /**
+ * The method descriptor of this RPC.
+ */
+ public abstract MethodDescriptor<?, ?> getMethodDescriptor();
+
+ /**
+ * The security level on the transport.
+ */
+ public abstract SecurityLevel getSecurityLevel();
+
+ /**
+ * Returns the authority string used to authenticate the server for this call.
+ */
+ public abstract String getAuthority();
+
+ /**
+ * Returns the transport attributes.
+ */
+ @Grpc.TransportAttr
+ public abstract Attributes getTransportAttrs();
+ }
}
diff --git a/core/src/main/java/io/grpc/CallCredentials2.java b/core/src/main/java/io/grpc/CallCredentials2.java
new file mode 100644
index 000000000..998df42db
--- /dev/null
+++ b/core/src/main/java/io/grpc/CallCredentials2.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2016 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc;
+
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.Executor;
+
+/**
+ * The new interface for {@link CallCredentials}.
+ *
+ * <p>THIS CLASS NAME IS TEMPORARY and is part of a migration. This class will BE DELETED as it
+ * replaces {@link CallCredentials} in short-term. THIS CLASS SHOULD ONLY BE REFERENCED BY
+ * IMPLEMENTIONS. All consumers should still reference {@link CallCredentials}.
+ */
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4901")
+public abstract class CallCredentials2 implements CallCredentials {
+ /**
+ * Pass the credential data to the given {@link CallCredentials.MetadataApplier}, which will
+ * propagate it to the request metadata.
+ *
+ * <p>It is called for each individual RPC, within the {@link Context} of the call, before the
+ * stream is about to be created on a transport. Implementations should not block in this
+ * method. If metadata is not immediately available, e.g., needs to be fetched from network, the
+ * implementation may give the {@code applier} to an asynchronous task which will eventually call
+ * the {@code applier}. The RPC proceeds only after the {@code applier} is called.
+ *
+ * @param requestInfo request-related information
+ * @param appExecutor The application thread-pool. It is provided to the implementation in case it
+ * needs to perform blocking operations.
+ * @param applier The outlet of the produced headers. It can be called either before or after this
+ * method returns.
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914")
+ public abstract void applyRequestMetadata(
+ RequestInfo requestInfo, Executor appExecutor, MetadataApplier applier);
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public final void applyRequestMetadata(
+ final MethodDescriptor<?, ?> method, final Attributes attrs,
+ Executor appExecutor, final CallCredentials.MetadataApplier applier) {
+ final String authority = checkNotNull(attrs.get(ATTR_AUTHORITY), "authority");
+ final SecurityLevel securityLevel =
+ firstNonNull(attrs.get(ATTR_SECURITY_LEVEL), SecurityLevel.NONE);
+ RequestInfo requestInfo = new RequestInfo() {
+ @Override
+ public MethodDescriptor<?, ?> getMethodDescriptor() {
+ return method;
+ }
+
+ @Override
+ public SecurityLevel getSecurityLevel() {
+ return securityLevel;
+ }
+
+ @Override
+ public String getAuthority() {
+ return authority;
+ }
+
+ @Override
+ public Attributes getTransportAttrs() {
+ return attrs;
+ }
+ };
+ MetadataApplier applierAdapter = new MetadataApplier() {
+ @Override
+ public void apply(Metadata headers) {
+ applier.apply(headers);
+ }
+
+ @Override
+ public void fail(Status status) {
+ applier.fail(status);
+ }
+ };
+ applyRequestMetadata(requestInfo, appExecutor, applierAdapter);
+ }
+
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1914")
+ @SuppressWarnings("deprecation")
+ public abstract static class MetadataApplier implements CallCredentials.MetadataApplier {}
+}
diff --git a/core/src/main/java/io/grpc/ClientCall.java b/core/src/main/java/io/grpc/ClientCall.java
index 936cb322b..2aa5034b7 100644
--- a/core/src/main/java/io/grpc/ClientCall.java
+++ b/core/src/main/java/io/grpc/ClientCall.java
@@ -262,12 +262,11 @@ public abstract class ClientCall<ReqT, RespT> {
* or {@link Listener#onClose}. If called prematurely, the implementation may throw {@code
* IllegalStateException} or return arbitrary {@code Attributes}.
*
- * <p>{@link Grpc} defines commonly used attributes, but they are not guaranteed to be present.
- *
* @return non-{@code null} attributes
* @throws IllegalStateException (optional) if called before permitted
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2607")
+ @Grpc.TransportAttr
public Attributes getAttributes() {
return Attributes.EMPTY;
}
diff --git a/core/src/main/java/io/grpc/EquivalentAddressGroup.java b/core/src/main/java/io/grpc/EquivalentAddressGroup.java
index 2218a7fae..eabd2eafb 100644
--- a/core/src/main/java/io/grpc/EquivalentAddressGroup.java
+++ b/core/src/main/java/io/grpc/EquivalentAddressGroup.java
@@ -17,6 +17,9 @@
package io.grpc;
import com.google.common.base.Preconditions;
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
@@ -50,7 +53,7 @@ public final class EquivalentAddressGroup {
/**
* List constructor with {@link Attributes}.
*/
- public EquivalentAddressGroup(List<SocketAddress> addrs, Attributes attrs) {
+ public EquivalentAddressGroup(List<SocketAddress> addrs, @Attr Attributes attrs) {
Preconditions.checkArgument(!addrs.isEmpty(), "addrs is empty");
this.addrs = Collections.unmodifiableList(new ArrayList<>(addrs));
this.attrs = Preconditions.checkNotNull(attrs, "attrs");
@@ -69,7 +72,7 @@ public final class EquivalentAddressGroup {
/**
* Singleton constructor with Attributes.
*/
- public EquivalentAddressGroup(SocketAddress addr, Attributes attrs) {
+ public EquivalentAddressGroup(SocketAddress addr, @Attr Attributes attrs) {
this(Collections.singletonList(addr), attrs);
}
@@ -83,6 +86,7 @@ public final class EquivalentAddressGroup {
/**
* Returns the attributes.
*/
+ @Attr
public Attributes getAttributes() {
return attrs;
}
@@ -127,4 +131,13 @@ public final class EquivalentAddressGroup {
}
return true;
}
+
+ /**
+ * Annotation for {@link EquivalentAddressGroup}'s attributes. It follows the annotation semantics
+ * defined by {@link Attributes}.
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4972")
+ @Retention(RetentionPolicy.SOURCE)
+ @Documented
+ public @interface Attr {}
}
diff --git a/core/src/main/java/io/grpc/Grpc.java b/core/src/main/java/io/grpc/Grpc.java
index aef072606..53ff28be3 100644
--- a/core/src/main/java/io/grpc/Grpc.java
+++ b/core/src/main/java/io/grpc/Grpc.java
@@ -16,6 +16,9 @@
package io.grpc;
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
import java.net.SocketAddress;
import javax.net.ssl.SSLSession;
@@ -31,13 +34,32 @@ public final class Grpc {
* Attribute key for the remote address of a transport.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
+ @TransportAttr
public static final Attributes.Key<SocketAddress> TRANSPORT_ATTR_REMOTE_ADDR =
- Attributes.Key.create("remote-addr");
+ Attributes.Key.create("remote-addr");
+
+ /**
+ * Attribute key for the local address of a transport.
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
+ @TransportAttr
+ public static final Attributes.Key<SocketAddress> TRANSPORT_ATTR_LOCAL_ADDR =
+ Attributes.Key.create("local-addr");
/**
* Attribute key for SSL session of a transport.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1710")
+ @TransportAttr
public static final Attributes.Key<SSLSession> TRANSPORT_ATTR_SSL_SESSION =
- Attributes.Key.create("ssl-session");
+ Attributes.Key.create("ssl-session");
+
+ /**
+ * Annotation for transport attributes. It follows the annotation semantics defined
+ * by {@link Attributes}.
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4972")
+ @Retention(RetentionPolicy.SOURCE)
+ @Documented
+ public @interface TransportAttr {}
}
diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java
index 633bf5263..f1cef63fb 100644
--- a/core/src/main/java/io/grpc/LoadBalancer.java
+++ b/core/src/main/java/io/grpc/LoadBalancer.java
@@ -107,11 +107,12 @@ public abstract class LoadBalancer {
* <p>Implementations should not modify the given {@code servers}.
*
* @param servers the resolved server addresses, never empty.
- * @param attributes extra metadata from naming system.
+ * @param attributes extra information from naming system.
* @since 1.2.0
*/
public abstract void handleResolvedAddressGroups(
- List<EquivalentAddressGroup> servers, Attributes attributes);
+ List<EquivalentAddressGroup> servers,
+ @NameResolver.ResolutionResultAttr Attributes attributes);
/**
* Handles an error from the name resolution system.
@@ -154,6 +155,11 @@ public abstract class LoadBalancer {
*/
public abstract void shutdown();
+ @Override
+ public String toString() {
+ return getClass().getSimpleName();
+ }
+
/**
* The main balancing logic. It <strong>must be thread-safe</strong>. Typically it should only
* synchronize on its own state, and avoid synchronizing with the LoadBalancer's state.
diff --git a/core/src/main/java/io/grpc/NameResolver.java b/core/src/main/java/io/grpc/NameResolver.java
index 60f819e4d..845dcb02f 100644
--- a/core/src/main/java/io/grpc/NameResolver.java
+++ b/core/src/main/java/io/grpc/NameResolver.java
@@ -16,6 +16,9 @@
package io.grpc;
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
import java.net.URI;
import java.util.List;
import javax.annotation.Nullable;
@@ -126,6 +129,7 @@ public abstract class NameResolver {
*
* @since 1.0.0
*/
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
@ThreadSafe
public interface Listener {
/**
@@ -134,10 +138,11 @@ public abstract class NameResolver {
* <p>Implementations will not modify the given {@code servers}.
*
* @param servers the resolved server addresses. An empty list will trigger {@link #onError}
- * @param attributes extra metadata from naming system
+ * @param attributes extra information from naming system.
* @since 1.3.0
*/
- void onAddresses(List<EquivalentAddressGroup> servers, Attributes attributes);
+ void onAddresses(
+ List<EquivalentAddressGroup> servers, @ResolutionResultAttr Attributes attributes);
/**
* Handles an error from the resolver. The listener is responsible for eventually invoking
@@ -148,4 +153,13 @@ public abstract class NameResolver {
*/
void onError(Status error);
}
+
+ /**
+ * Annotation for name resolution result attributes. It follows the annotation semantics defined
+ * by {@link Attributes}.
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4972")
+ @Retention(RetentionPolicy.SOURCE)
+ @Documented
+ public @interface ResolutionResultAttr {}
}
diff --git a/core/src/main/java/io/grpc/ServerCall.java b/core/src/main/java/io/grpc/ServerCall.java
index 106676fa7..07711e381 100644
--- a/core/src/main/java/io/grpc/ServerCall.java
+++ b/core/src/main/java/io/grpc/ServerCall.java
@@ -192,11 +192,11 @@ public abstract class ServerCall<ReqT, RespT> {
* Returns properties of a single call.
*
* <p>Attributes originate from the transport and can be altered by {@link ServerTransportFilter}.
- * {@link Grpc} defines commonly used attributes, but they are not guaranteed to be present.
*
* @return non-{@code null} Attributes container
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1779")
+ @Grpc.TransportAttr
public Attributes getAttributes() {
return Attributes.EMPTY;
}
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
index 7cf1a89ba..03e2e1a19 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
@@ -24,7 +24,6 @@ import com.google.common.base.MoreObjects;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
-import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.Compressor;
import io.grpc.Deadline;
@@ -41,6 +40,7 @@ import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ConnectionClientTransport;
+import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.NoopClientStream;
@@ -91,7 +91,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
@GuardedBy("this")
private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
private final Attributes attributes = Attributes.newBuilder()
- .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
+ .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.build();
public InProcessTransport(String name, String authority, String userAgent) {
@@ -132,6 +132,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
synchronized (InProcessTransport.this) {
Attributes serverTransportAttrs = Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name))
+ .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name))
.build();
serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
clientTransportListener.transportReady();
diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java
index 80505bce3..a8f778a46 100644
--- a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java
+++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java
@@ -16,11 +16,14 @@
package io.grpc.internal;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
+import io.grpc.InternalChannelz.ChannelTrace;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
@@ -32,6 +35,7 @@ import java.lang.reflect.Method;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory {
@@ -43,11 +47,20 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory {
static final String GRPCLB_LOAD_BALANCER_FACTORY_NAME =
"io.grpc.grpclb.GrpclbLoadBalancerFactory";
- AutoConfiguredLoadBalancerFactory() {}
+ @Nullable
+ private final ChannelTracer channelTracer;
+ @Nullable
+ private final TimeProvider timeProvider;
+
+ AutoConfiguredLoadBalancerFactory(
+ @Nullable ChannelTracer channelTracer, @Nullable TimeProvider timeProvider) {
+ this.channelTracer = channelTracer;
+ this.timeProvider = timeProvider;
+ }
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
- return new AutoConfiguredLoadBalancer(helper);
+ return new AutoConfiguredLoadBalancer(helper, channelTracer, timeProvider);
}
private static final class NoopLoadBalancer extends LoadBalancer {
@@ -71,11 +84,21 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory {
private final Helper helper;
private LoadBalancer delegate;
private LoadBalancer.Factory delegateFactory;
+ @CheckForNull
+ private ChannelTracer channelTracer;
+ @Nullable
+ private final TimeProvider timeProvider;
- AutoConfiguredLoadBalancer(Helper helper) {
+ AutoConfiguredLoadBalancer(
+ Helper helper, @Nullable ChannelTracer channelTracer, @Nullable TimeProvider timeProvider) {
this.helper = helper;
delegateFactory = PickFirstBalancerFactory.getInstance();
delegate = delegateFactory.newLoadBalancer(helper);
+ this.channelTracer = channelTracer;
+ this.timeProvider = timeProvider;
+ if (channelTracer != null) {
+ checkNotNull(timeProvider, "timeProvider");
+ }
}
// Must be run inside ChannelExecutor.
@@ -101,7 +124,15 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory {
helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker());
delegate.shutdown();
delegateFactory = newlbf;
+ LoadBalancer old = delegate;
delegate = delegateFactory.newLoadBalancer(helper);
+ if (channelTracer != null) {
+ channelTracer.reportEvent(new ChannelTrace.Event.Builder()
+ .setDescription("Load balancer changed from " + old + " to " + delegate)
+ .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
+ .setTimestampNanos(timeProvider.currentTimeNanos())
+ .build());
+ }
}
getDelegate().handleResolvedAddressGroups(servers, attributes);
}
diff --git a/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java b/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java
index df2e0a0bd..e5141cf1e 100644
--- a/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java
+++ b/core/src/main/java/io/grpc/internal/CallCredentialsApplyingTransportFactory.java
@@ -72,6 +72,7 @@ final class CallCredentialsApplyingTransportFactory implements ClientTransportFa
}
@Override
+ @SuppressWarnings("deprecation")
public ClientStream newStream(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
CallCredentials creds = callOptions.getCredentials();
diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolver.java b/core/src/main/java/io/grpc/internal/DnsNameResolver.java
index 490bd8255..dc1b783fd 100644
--- a/core/src/main/java/io/grpc/internal/DnsNameResolver.java
+++ b/core/src/main/java/io/grpc/internal/DnsNameResolver.java
@@ -20,7 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
@@ -43,7 +43,6 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -85,27 +84,18 @@ final class DnsNameResolver extends NameResolver {
private static final String JNDI_PROPERTY =
System.getProperty("io.grpc.internal.DnsNameResolverProvider.enable_jndi", "true");
+ private static final String JNDI_LOCALHOST_PROPERTY =
+ System.getProperty("io.grpc.internal.DnsNameResolverProvider.enable_jndi_localhost", "false");
private static final String JNDI_SRV_PROPERTY =
System.getProperty("io.grpc.internal.DnsNameResolverProvider.enable_grpclb", "false");
private static final String JNDI_TXT_PROPERTY =
System.getProperty("io.grpc.internal.DnsNameResolverProvider.enable_service_config", "false");
- /**
- * Java networking system properties name for caching DNS result.
- *
- * <p>Default value is -1 (cache forever) if security manager is installed. If security manager is
- * not installed, the ttl value is {@code null} which falls back to {@link
- * #DEFAULT_NETWORK_CACHE_TTL_SECONDS gRPC default value}.
- */
- @VisibleForTesting
- static final String NETWORKADDRESS_CACHE_TTL_PROPERTY = "networkaddress.cache.ttl";
- /** Default DNS cache duration if network cache ttl value is not specified ({@code null}). */
- @VisibleForTesting
- static final long DEFAULT_NETWORK_CACHE_TTL_SECONDS = 30;
-
@VisibleForTesting
static boolean enableJndi = Boolean.parseBoolean(JNDI_PROPERTY);
@VisibleForTesting
+ static boolean enableJndiLocalhost = Boolean.parseBoolean(JNDI_LOCALHOST_PROPERTY);
+ @VisibleForTesting
static boolean enableSrv = Boolean.parseBoolean(JNDI_SRV_PROPERTY);
@VisibleForTesting
static boolean enableTxt = Boolean.parseBoolean(JNDI_TXT_PROPERTY);
@@ -130,8 +120,6 @@ final class DnsNameResolver extends NameResolver {
private final String host;
private final int port;
private final Resource<ExecutorService> executorResource;
- private final long networkAddressCacheTtlNanos;
- private final Stopwatch stopwatch;
@GuardedBy("this")
private boolean shutdown;
@GuardedBy("this")
@@ -140,11 +128,10 @@ final class DnsNameResolver extends NameResolver {
private boolean resolving;
@GuardedBy("this")
private Listener listener;
- private ResolutionResults cachedResolutionResults;
DnsNameResolver(@Nullable String nsAuthority, String name, Attributes params,
- Resource<ExecutorService> executorResource, ProxyDetector proxyDetector,
- Stopwatch stopwatch) {
+ Resource<ExecutorService> executorResource,
+ ProxyDetector proxyDetector) {
// TODO: if a DNS server is provided as nsAuthority, use it.
// https://www.captechconsulting.com/blogs/accessing-the-dusty-corners-of-dns-with-java
this.executorResource = executorResource;
@@ -167,8 +154,6 @@ final class DnsNameResolver extends NameResolver {
port = nameUri.getPort();
}
this.proxyDetector = proxyDetector;
- this.stopwatch = Preconditions.checkNotNull(stopwatch, "stopwatch");
- this.networkAddressCacheTtlNanos = getNetworkAddressCacheTtlNanos();
}
@Override
@@ -198,13 +183,6 @@ final class DnsNameResolver extends NameResolver {
if (shutdown) {
return;
}
- boolean resourceRefreshRequired = cachedResolutionResults == null
- || networkAddressCacheTtlNanos == 0
- || (networkAddressCacheTtlNanos > 0
- && stopwatch.elapsed(TimeUnit.NANOSECONDS) > networkAddressCacheTtlNanos);
- if (!resourceRefreshRequired) {
- return;
- }
savedListener = listener;
resolving = true;
}
@@ -229,15 +207,11 @@ final class DnsNameResolver extends NameResolver {
ResolutionResults resolutionResults;
try {
ResourceResolver resourceResolver = null;
- if (enableJndi) {
+ if (shouldUseJndi(enableJndi, enableJndiLocalhost, host)) {
resourceResolver = getResourceResolver();
}
resolutionResults =
resolveAll(addressResolver, resourceResolver, enableSrv, enableTxt, host);
- cachedResolutionResults = resolutionResults;
- if (networkAddressCacheTtlNanos > 0) {
- stopwatch.reset().start();
- }
} catch (Exception e) {
savedListener.onError(
Status.UNAVAILABLE.withDescription("Unable to resolve host " + host).withCause(e));
@@ -284,23 +258,6 @@ final class DnsNameResolver extends NameResolver {
}
};
- /** Returns value of network address cache ttl property. */
- private static long getNetworkAddressCacheTtlNanos() {
- String cacheTtlPropertyValue = System.getProperty(NETWORKADDRESS_CACHE_TTL_PROPERTY);
- long cacheTtl = DEFAULT_NETWORK_CACHE_TTL_SECONDS;
- if (cacheTtlPropertyValue != null) {
- try {
- cacheTtl = Long.parseLong(cacheTtlPropertyValue);
- } catch (NumberFormatException e) {
- logger.log(
- Level.WARNING,
- "Property({0}) valid is not valid number format({1}), fall back to default({2})",
- new Object[] {NETWORKADDRESS_CACHE_TTL_PROPERTY, cacheTtlPropertyValue, cacheTtl});
- }
- }
- return cacheTtl > 0 ? TimeUnit.SECONDS.toNanos(cacheTtl) : cacheTtl;
- }
-
@GuardedBy("this")
private void resolve() {
if (resolving || shutdown) {
@@ -368,7 +325,9 @@ final class DnsNameResolver extends NameResolver {
}
}
try {
- if (addressesException != null && balancerAddressesException != null) {
+ if (addressesException != null
+ && (balancerAddressesException != null || balancerAddresses.isEmpty())) {
+ Throwables.throwIfUnchecked(addressesException);
throw new RuntimeException(addressesException);
}
} finally {
@@ -626,4 +585,28 @@ final class DnsNameResolver extends NameResolver {
}
return localHostname;
}
+
+ @VisibleForTesting
+ static boolean shouldUseJndi(boolean jndiEnabled, boolean jndiLocalhostEnabled, String target) {
+ if (!jndiEnabled) {
+ return false;
+ }
+ if ("localhost".equalsIgnoreCase(target)) {
+ return jndiLocalhostEnabled;
+ }
+ // Check if this name looks like IPv6
+ if (target.contains(":")) {
+ return false;
+ }
+ // Check if this might be IPv4. Such addresses have no alphabetic characters. This also
+ // checks the target is empty.
+ boolean alldigits = true;
+ for (int i = 0; i < target.length(); i++) {
+ char c = target.charAt(i);
+ if (c != '.') {
+ alldigits &= (c >= '0' && c <= '9');
+ }
+ }
+ return !alldigits;
+ }
}
diff --git a/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java b/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java
index d0db539d4..cddbe3f3b 100644
--- a/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java
+++ b/core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java
@@ -17,7 +17,6 @@
package io.grpc.internal;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
import io.grpc.Attributes;
import io.grpc.NameResolverProvider;
import java.net.URI;
@@ -53,8 +52,7 @@ public final class DnsNameResolverProvider extends NameResolverProvider {
name,
params,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
- GrpcUtil.getDefaultProxyDetector(),
- Stopwatch.createUnstarted());
+ GrpcUtil.getDefaultProxyDetector());
} else {
return null;
}
diff --git a/core/src/main/java/io/grpc/internal/GrpcAttributes.java b/core/src/main/java/io/grpc/internal/GrpcAttributes.java
index 67da06fa7..6a63864f6 100644
--- a/core/src/main/java/io/grpc/internal/GrpcAttributes.java
+++ b/core/src/main/java/io/grpc/internal/GrpcAttributes.java
@@ -17,6 +17,10 @@
package io.grpc.internal;
import io.grpc.Attributes;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.Grpc;
+import io.grpc.NameResolver;
+import io.grpc.SecurityLevel;
import java.util.Map;
/**
@@ -26,6 +30,7 @@ public final class GrpcAttributes {
/**
* Attribute key for service config.
*/
+ @NameResolver.ResolutionResultAttr
public static final Attributes.Key<Map<String, Object>> NAME_RESOLVER_SERVICE_CONFIG =
Attributes.Key.create("service-config");
@@ -33,6 +38,7 @@ public final class GrpcAttributes {
* The naming authority of a gRPC LB server address. It is an address-group-level attribute,
* present when the address group is a LoadBalancer.
*/
+ @EquivalentAddressGroup.Attr
public static final Attributes.Key<String> ATTR_LB_ADDR_AUTHORITY =
Attributes.Key.create("io.grpc.grpclb.lbAddrAuthority");
@@ -40,8 +46,18 @@ public final class GrpcAttributes {
* Whether this EquivalentAddressGroup was provided by a GRPCLB server. It would be rare for this
* value to be {@code false}; generally it would be better to not have the key present at all.
*/
+ @EquivalentAddressGroup.Attr
public static final Attributes.Key<Boolean> ATTR_LB_PROVIDED_BACKEND =
Attributes.Key.create("io.grpc.grpclb.lbProvidedBackend");
+ /**
+ * The security level of the transport. If it's not present, {@link SecurityLevel#NONE} should be
+ * assumed.
+ */
+ @SuppressWarnings("deprecation")
+ @Grpc.TransportAttr
+ public static final Attributes.Key<SecurityLevel> ATTR_SECURITY_LEVEL =
+ io.grpc.CallCredentials.ATTR_SECURITY_LEVEL;
+
private GrpcAttributes() {}
}
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index 539829941..80fd2f52c 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -200,7 +200,7 @@ public final class GrpcUtil {
public static final Splitter ACCEPT_ENCODING_SPLITTER = Splitter.on(',').trimResults();
- private static final String IMPLEMENTATION_VERSION = "1.16.0-SNAPSHOT"; // CURRENT_GRPC_VERSION
+ private static final String IMPLEMENTATION_VERSION = "1.16.1"; // CURRENT_GRPC_VERSION
/**
* The default delay in nanos before we send a keepalive.
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index dc203b016..e1f840ee3 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -125,13 +125,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final TimeProvider timeProvider;
private final int maxTraceEvents;
- private final ChannelExecutor channelExecutor = new ChannelExecutor() {
- @Override
- void handleUncaughtThrowable(Throwable t) {
- super.handleUncaughtThrowable(t);
- panic(t);
- }
- };
+ private final ChannelExecutor channelExecutor = new PanicChannelExecutor();
private boolean fullStreamDecompression;
@@ -236,34 +230,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
// Called from channelExecutor
private final ManagedClientTransport.Listener delayedTransportListener =
- new ManagedClientTransport.Listener() {
- @Override
- public void transportShutdown(Status s) {
- checkState(shutdown.get(), "Channel must have been shut down");
- }
-
- @Override
- public void transportReady() {
- // Don't care
- }
-
- @Override
- public void transportInUse(final boolean inUse) {
- inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
- }
-
- @Override
- public void transportTerminated() {
- checkState(shutdown.get(), "Channel must have been shut down");
- terminating = true;
- shutdownNameResolverAndLoadBalancer(false);
- // No need to call channelStateManager since we are already in SHUTDOWN state.
- // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them
- // here.
- maybeShutdownNowSubchannels();
- maybeTerminateChannel();
- }
- };
+ new DelayedTransportListener();
// Must be called from channelExecutor
private void maybeShutdownNowSubchannels() {
@@ -279,27 +246,12 @@ final class ManagedChannelImpl extends ManagedChannel implements
// Must be accessed from channelExecutor
@VisibleForTesting
- final InUseStateAggregator<Object> inUseStateAggregator =
- new InUseStateAggregator<Object>() {
- @Override
- void handleInUse() {
- exitIdleMode();
- }
-
- @Override
- void handleNotInUse() {
- if (shutdown.get()) {
- return;
- }
- rescheduleIdleTimer();
- }
- };
+ final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
@Override
public ListenableFuture<ChannelStats> getStats() {
final SettableFuture<ChannelStats> ret = SettableFuture.create();
- // subchannels and oobchannels can only be accessed from channelExecutor
- channelExecutor.executeLater(new Runnable() {
+ final class StatsFetcher implements Runnable {
@Override
public void run() {
ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
@@ -314,7 +266,10 @@ final class ManagedChannelImpl extends ManagedChannel implements
builder.setSubchannels(children);
ret.set(builder.build());
}
- }).drain();
+ }
+
+ // subchannels and oobchannels can only be accessed from channelExecutor
+ channelExecutor.executeLater(new StatsFetcher()).drain();
return ret;
}
@@ -463,7 +418,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
}
- private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
+ private final class ChannelTransportProvider implements ClientTransportProvider {
@Override
public ClientTransport get(PickSubchannelArgs args) {
SubchannelPicker pickerCopy = subchannelPicker;
@@ -473,12 +428,14 @@ final class ManagedChannelImpl extends ManagedChannel implements
return delayedTransport;
}
if (pickerCopy == null) {
- channelExecutor.executeLater(new Runnable() {
- @Override
- public void run() {
- exitIdleMode();
- }
- }).drain();
+ final class ExitIdleModeForTransport implements Runnable {
+ @Override
+ public void run() {
+ exitIdleMode();
+ }
+ }
+
+ channelExecutor.executeLater(new ExitIdleModeForTransport()).drain();
return delayedTransport;
}
// There is no need to reschedule the idle timer here.
@@ -507,11 +464,21 @@ final class ManagedChannelImpl extends ManagedChannel implements
final Metadata headers,
final Context context) {
checkState(retryEnabled, "retry should be enabled");
- return new RetriableStream<ReqT>(
- method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit,
- getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(),
- callOptions.getOption(RETRY_POLICY_KEY), callOptions.getOption(HEDGING_POLICY_KEY),
- throttle) {
+ final class RetryStream extends RetriableStream<ReqT> {
+ RetryStream() {
+ super(
+ method,
+ headers,
+ channelBufferUsed,
+ perRpcBufferLimit,
+ channelBufferLimit,
+ getCallExecutor(callOptions),
+ transportFactory.getScheduledExecutorService(),
+ callOptions.getOption(RETRY_POLICY_KEY),
+ callOptions.getOption(HEDGING_POLICY_KEY),
+ throttle);
+ }
+
@Override
Status prestart() {
return uncommittedRetriableStreamsRegistry.add(this);
@@ -534,9 +501,13 @@ final class ManagedChannelImpl extends ManagedChannel implements
context.detach(origContext);
}
}
- };
+ }
+
+ return new RetryStream();
}
- };
+ }
+
+ private final ClientTransportProvider transportProvider = new ChannelTransportProvider();
private final Rescheduler idleTimer;
@@ -552,8 +523,16 @@ final class ManagedChannelImpl extends ManagedChannel implements
this.nameResolverFactory = builder.getNameResolverFactory();
this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams");
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
+ this.timeProvider = checkNotNull(timeProvider, "timeProvider");
+ maxTraceEvents = builder.maxTraceEvents;
+ if (maxTraceEvents > 0) {
+ long currentTimeNanos = timeProvider.currentTimeNanos();
+ channelTracer = new ChannelTracer(builder.maxTraceEvents, currentTimeNanos, "Channel");
+ } else {
+ channelTracer = null;
+ }
if (builder.loadBalancerFactory == null) {
- this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory();
+ this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(channelTracer, timeProvider);
} else {
this.loadBalancerFactory = builder.loadBalancerFactory;
}
@@ -568,7 +547,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
serviceConfigInterceptor = new ServiceConfigInterceptor(
retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
- Channel channel = new RealChannel();
+ Channel channel = new RealChannel(nameResolver.getServiceAuthority());
channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor);
if (builder.binlog != null) {
channel = builder.binlog.wrapChannel(channel);
@@ -606,24 +585,18 @@ final class ManagedChannelImpl extends ManagedChannel implements
this.channelBufferLimit = builder.retryBufferSize;
this.perRpcBufferLimit = builder.perRpcBufferLimit;
- this.timeProvider = checkNotNull(timeProvider, "timeProvider");
- this.callTracerFactory = new CallTracer.Factory() {
+ final class ChannelCallTracerFactory implements CallTracer.Factory {
@Override
public CallTracer create() {
return new CallTracer(timeProvider);
}
- };
+ }
+
+ this.callTracerFactory = new ChannelCallTracerFactory();
channelCallTracer = callTracerFactory.create();
this.channelz = checkNotNull(builder.channelz);
channelz.addRootChannel(this);
- maxTraceEvents = builder.maxTraceEvents;
- if (maxTraceEvents > 0) {
- long currentTimeNanos = timeProvider.currentTimeNanos();
- channelTracer = new ChannelTracer(builder.maxTraceEvents, currentTimeNanos, "Channel");
- } else {
- channelTracer = null;
- }
logger.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target});
}
@@ -687,7 +660,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
// delayedTransport.shutdown() may also add some tasks into the queue. But some things inside
// delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the
// channelExecutor's queue and should not be blocked, so we do not drain() immediately here.
- channelExecutor.executeLater(new Runnable() {
+ final class Shutdown implements Runnable {
@Override
public void run() {
if (channelTracer != null) {
@@ -699,15 +672,19 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
channelStateManager.gotoState(SHUTDOWN);
}
- });
+ }
+
+ channelExecutor.executeLater(new Shutdown());
uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
- channelExecutor.executeLater(new Runnable() {
- @Override
- public void run() {
- cancelIdleTimer(/* permanent= */ true);
- }
- }).drain();
+ final class CancelIdleTimer implements Runnable {
+ @Override
+ public void run() {
+ cancelIdleTimer(/* permanent= */ true);
+ }
+ }
+
+ channelExecutor.executeLater(new CancelIdleTimer()).drain();
logger.log(Level.FINE, "[{0}] Shutting down", getLogId());
return this;
}
@@ -722,16 +699,18 @@ final class ManagedChannelImpl extends ManagedChannel implements
logger.log(Level.FINE, "[{0}] shutdownNow() called", getLogId());
shutdown();
uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
- channelExecutor.executeLater(new Runnable() {
- @Override
- public void run() {
- if (shutdownNowed) {
- return;
- }
- shutdownNowed = true;
- maybeShutdownNowSubchannels();
+ final class ShutdownNow implements Runnable {
+ @Override
+ public void run() {
+ if (shutdownNowed) {
+ return;
}
- }).drain();
+ shutdownNowed = true;
+ maybeShutdownNowSubchannels();
+ }
+ }
+
+ channelExecutor.executeLater(new ShutdownNow()).drain();
return this;
}
@@ -745,16 +724,18 @@ final class ManagedChannelImpl extends ManagedChannel implements
panicMode = true;
cancelIdleTimer(/* permanent= */ true);
shutdownNameResolverAndLoadBalancer(false);
- SubchannelPicker newPicker = new SubchannelPicker() {
- final PickResult panicPickResult =
+ final class PanicSubchannelPicker extends SubchannelPicker {
+ private final PickResult panicPickResult =
PickResult.withDrop(
Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
+
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return panicPickResult;
}
- };
- updateSubchannelPicker(newPicker);
+ }
+
+ updateSubchannelPicker(new PanicSubchannelPicker());
if (channelTracer != null) {
channelTracer.reportEvent(
new ChannelTrace.Event.Builder()
@@ -810,6 +791,14 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
private class RealChannel extends Channel {
+ // Set when the NameResolver is initially created. When we create a new NameResolver for the
+ // same target, the new instance must have the same value.
+ private final String authority;
+
+ private RealChannel(String authority) {
+ this.authority = checkNotNull(authority, "authority");
+ }
+
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
@@ -828,8 +817,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
@Override
public String authority() {
- String authority = nameResolver.getServiceAuthority();
- return checkNotNull(authority, "authority");
+ return authority;
}
}
@@ -856,60 +844,61 @@ final class ManagedChannelImpl extends ManagedChannel implements
public ConnectivityState getState(boolean requestConnection) {
ConnectivityState savedChannelState = channelStateManager.getState();
if (requestConnection && savedChannelState == IDLE) {
- channelExecutor.executeLater(
- new Runnable() {
- @Override
- public void run() {
- exitIdleMode();
- if (subchannelPicker != null) {
- subchannelPicker.requestConnection();
- }
- }
- }).drain();
+ final class RequestConnection implements Runnable {
+ @Override
+ public void run() {
+ exitIdleMode();
+ if (subchannelPicker != null) {
+ subchannelPicker.requestConnection();
+ }
+ }
+ }
+
+ channelExecutor.executeLater(new RequestConnection()).drain();
}
return savedChannelState;
}
@Override
public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
- channelExecutor.executeLater(
- new Runnable() {
- @Override
- public void run() {
- channelStateManager.notifyWhenStateChanged(callback, executor, source);
- }
- }).drain();
+ final class NotifyStateChanged implements Runnable {
+ @Override
+ public void run() {
+ channelStateManager.notifyWhenStateChanged(callback, executor, source);
+ }
+ }
+
+ channelExecutor.executeLater(new NotifyStateChanged()).drain();
}
@Override
public void resetConnectBackoff() {
- channelExecutor
- .executeLater(
- new Runnable() {
- @Override
- public void run() {
- if (shutdown.get()) {
- return;
- }
- if (nameResolverRefreshFuture != null) {
- checkState(nameResolverStarted, "name resolver must be started");
- cancelNameResolverBackoff();
- nameResolver.refresh();
- }
- for (InternalSubchannel subchannel : subchannels) {
- subchannel.resetConnectBackoff();
- }
- for (OobChannel oobChannel : oobChannels) {
- oobChannel.resetConnectBackoff();
- }
- }
- })
- .drain();
+ final class ResetConnectBackoff implements Runnable {
+ @Override
+ public void run() {
+ if (shutdown.get()) {
+ return;
+ }
+ if (nameResolverRefreshFuture != null) {
+ checkState(nameResolverStarted, "name resolver must be started");
+ cancelNameResolverBackoff();
+ nameResolver.refresh();
+ }
+ for (InternalSubchannel subchannel : subchannels) {
+ subchannel.resetConnectBackoff();
+ }
+ for (OobChannel oobChannel : oobChannels) {
+ oobChannel.resetConnectBackoff();
+ }
+ }
+ }
+
+ channelExecutor.executeLater(new ResetConnectBackoff()).drain();
}
@Override
public void enterIdle() {
- class PrepareToLoseNetworkRunnable implements Runnable {
+ final class PrepareToLoseNetworkRunnable implements Runnable {
@Override
public void run() {
if (shutdown.get() || lbHelper == null) {
@@ -933,7 +922,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
final Object lock = new Object();
@GuardedBy("lock")
- Collection<ClientStream> uncommittedRetriableStreams = new HashSet<ClientStream>();
+ Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
@GuardedBy("lock")
Status shutdownStatus;
@@ -996,7 +985,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
shutdownStatusCopy = shutdownStatus;
// Because retriable transport is long-lived, we take this opportunity to down-size the
// hashmap.
- uncommittedRetriableStreams = new HashSet<ClientStream>();
+ uncommittedRetriableStreams = new HashSet<>();
}
}
@@ -1034,6 +1023,36 @@ final class ManagedChannelImpl extends ManagedChannel implements
if (maxTraceEvents > 0) {
subchannelTracer = new ChannelTracer(maxTraceEvents, subchannelCreationTime, "Subchannel");
}
+
+ final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
+ // All callbacks are run in channelExecutor
+ @Override
+ void onTerminated(InternalSubchannel is) {
+ subchannels.remove(is);
+ channelz.removeSubchannel(is);
+ maybeTerminateChannel();
+ }
+
+ @Override
+ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
+ handleInternalSubchannelState(newState);
+ // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
+ if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) {
+ lb.handleSubchannelState(subchannel, newState);
+ }
+ }
+
+ @Override
+ void onInUse(InternalSubchannel is) {
+ inUseStateAggregator.updateObjectInUse(is, true);
+ }
+
+ @Override
+ void onNotInUse(InternalSubchannel is) {
+ inUseStateAggregator.updateObjectInUse(is, false);
+ }
+ }
+
final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroups,
authority(),
@@ -1043,34 +1062,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
transportFactory.getScheduledExecutorService(),
stopwatchSupplier,
channelExecutor,
- new InternalSubchannel.Callback() {
- // All callbacks are run in channelExecutor
- @Override
- void onTerminated(InternalSubchannel is) {
- subchannels.remove(is);
- channelz.removeSubchannel(is);
- maybeTerminateChannel();
- }
-
- @Override
- void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
- handleInternalSubchannelState(newState);
- // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
- if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) {
- lb.handleSubchannelState(subchannel, newState);
- }
- }
-
- @Override
- void onInUse(InternalSubchannel is) {
- inUseStateAggregator.updateObjectInUse(is, true);
- }
-
- @Override
- void onNotInUse(InternalSubchannel is) {
- inUseStateAggregator.updateObjectInUse(is, false);
- }
- },
+ new ManagedInternalSubchannelCallback(),
channelz,
callTracerFactory.create(),
subchannelTracer,
@@ -1087,24 +1079,27 @@ final class ManagedChannelImpl extends ManagedChannel implements
subchannel.subchannel = internalSubchannel;
logger.log(Level.FINE, "[{0}] {1} created for {2}",
new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroups});
- runSerialized(new Runnable() {
- @Override
- public void run() {
- if (terminating) {
- // Because runSerialized() doesn't guarantee the runnable has been executed upon when
- // returning, the subchannel may still be returned to the balancer without being
- // shutdown even if "terminating" is already true. The subchannel will not be used in
- // this case, because delayed transport has terminated when "terminating" becomes
- // true, and no more requests will be sent to balancer beyond this point.
- internalSubchannel.shutdown(SHUTDOWN_STATUS);
- }
- if (!terminated) {
- // If channel has not terminated, it will track the subchannel and block termination
- // for it.
- subchannels.add(internalSubchannel);
- }
+
+ final class AddSubchannel implements Runnable {
+ @Override
+ public void run() {
+ if (terminating) {
+ // Because runSerialized() doesn't guarantee the runnable has been executed upon when
+ // returning, the subchannel may still be returned to the balancer without being
+ // shutdown even if "terminating" is already true. The subchannel will not be used in
+ // this case, because delayed transport has terminated when "terminating" becomes
+ // true, and no more requests will be sent to balancer beyond this point.
+ internalSubchannel.shutdown(SHUTDOWN_STATUS);
+ }
+ if (!terminated) {
+ // If channel has not terminated, it will track the subchannel and block termination
+ // for it.
+ subchannels.add(internalSubchannel);
}
- });
+ }
+ }
+
+ runSerialized(new AddSubchannel());
return subchannel;
}
@@ -1113,30 +1108,30 @@ final class ManagedChannelImpl extends ManagedChannel implements
final ConnectivityState newState, final SubchannelPicker newPicker) {
checkNotNull(newState, "newState");
checkNotNull(newPicker, "newPicker");
-
- runSerialized(
- new Runnable() {
- @Override
- public void run() {
- if (LbHelperImpl.this != lbHelper) {
- return;
- }
- updateSubchannelPicker(newPicker);
- // It's not appropriate to report SHUTDOWN state from lb.
- // Ignore the case of newState == SHUTDOWN for now.
- if (newState != SHUTDOWN) {
- if (channelTracer != null) {
- channelTracer.reportEvent(
- new ChannelTrace.Event.Builder()
- .setDescription("Entering " + newState + " state")
- .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
- .setTimestampNanos(timeProvider.currentTimeNanos())
- .build());
- }
- channelStateManager.gotoState(newState);
- }
+ final class UpdateBalancingState implements Runnable {
+ @Override
+ public void run() {
+ if (LbHelperImpl.this != lbHelper) {
+ return;
+ }
+ updateSubchannelPicker(newPicker);
+ // It's not appropriate to report SHUTDOWN state from lb.
+ // Ignore the case of newState == SHUTDOWN for now.
+ if (newState != SHUTDOWN) {
+ if (channelTracer != null) {
+ channelTracer.reportEvent(
+ new ChannelTrace.Event.Builder()
+ .setDescription("Entering " + newState + " state")
+ .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
+ .setTimestampNanos(timeProvider.currentTimeNanos())
+ .build());
}
- });
+ channelStateManager.gotoState(newState);
+ }
+ }
+ }
+
+ runSerialized(new UpdateBalancingState());
}
@Override
@@ -1169,26 +1164,28 @@ final class ManagedChannelImpl extends ManagedChannel implements
.build());
subchannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "Subchannel");
}
+ final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
+ @Override
+ void onTerminated(InternalSubchannel is) {
+ oobChannels.remove(oobChannel);
+ channelz.removeSubchannel(is);
+ oobChannel.handleSubchannelTerminated();
+ maybeTerminateChannel();
+ }
+
+ @Override
+ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
+ handleInternalSubchannelState(newState);
+ oobChannel.handleSubchannelStateChange(newState);
+ }
+ }
+
final InternalSubchannel internalSubchannel = new InternalSubchannel(
Collections.singletonList(addressGroup),
authority, userAgent, backoffPolicyProvider, transportFactory,
transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
// All callback methods are run from channelExecutor
- new InternalSubchannel.Callback() {
- @Override
- void onTerminated(InternalSubchannel is) {
- oobChannels.remove(oobChannel);
- channelz.removeSubchannel(is);
- oobChannel.handleSubchannelTerminated();
- maybeTerminateChannel();
- }
-
- @Override
- void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
- handleInternalSubchannelState(newState);
- oobChannel.handleSubchannelStateChange(newState);
- }
- },
+ new ManagedOobChannelCallback(),
channelz,
callTracerFactory.create(),
subchannelTracer,
@@ -1204,19 +1201,21 @@ final class ManagedChannelImpl extends ManagedChannel implements
channelz.addSubchannel(oobChannel);
channelz.addSubchannel(internalSubchannel);
oobChannel.setSubchannel(internalSubchannel);
- runSerialized(new Runnable() {
- @Override
- public void run() {
- if (terminating) {
- oobChannel.shutdown();
- }
- if (!terminated) {
- // If channel has not terminated, it will track the subchannel and block termination
- // for it.
- oobChannels.add(oobChannel);
- }
+ final class AddOobChannel implements Runnable {
+ @Override
+ public void run() {
+ if (terminating) {
+ oobChannel.shutdown();
+ }
+ if (!terminated) {
+ // If channel has not terminated, it will track the subchannel and block termination
+ // for it.
+ oobChannels.add(oobChannel);
}
- });
+ }
+ }
+
+ runSerialized(new AddOobChannel());
return oobChannel;
}
@@ -1325,41 +1324,40 @@ final class ManagedChannelImpl extends ManagedChannel implements
.build());
haveBackends = false;
}
- channelExecutor
- .executeLater(
- new Runnable() {
- @Override
- public void run() {
- // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
- if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
- return;
- }
- helper.lb.handleNameResolutionError(error);
- if (nameResolverRefreshFuture != null) {
- // The name resolver may invoke onError multiple times, but we only want to
- // schedule one backoff attempt
- // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
- // want to reset the backoff interval upon repeated onError() calls
- return;
- }
- if (nameResolverBackoffPolicy == null) {
- nameResolverBackoffPolicy = backoffPolicyProvider.get();
- }
- long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
- if (logger.isLoggable(Level.FINE)) {
- logger.log(
- Level.FINE,
- "[{0}] Scheduling DNS resolution backoff for {1} ns",
- new Object[] {logId, delayNanos});
- }
- nameResolverRefresh = new NameResolverRefresh();
- nameResolverRefreshFuture =
- transportFactory
- .getScheduledExecutorService()
- .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS);
- }
- })
- .drain();
+ final class NameResolverErrorHandler implements Runnable {
+ @Override
+ public void run() {
+ // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
+ if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
+ return;
+ }
+ helper.lb.handleNameResolutionError(error);
+ if (nameResolverRefreshFuture != null) {
+ // The name resolver may invoke onError multiple times, but we only want to
+ // schedule one backoff attempt
+ // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
+ // want to reset the backoff interval upon repeated onError() calls
+ return;
+ }
+ if (nameResolverBackoffPolicy == null) {
+ nameResolverBackoffPolicy = backoffPolicyProvider.get();
+ }
+ long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(
+ Level.FINE,
+ "[{0}] Scheduling DNS resolution backoff for {1} ns",
+ new Object[] {logId, delayNanos});
+ }
+ nameResolverRefresh = new NameResolverRefresh();
+ nameResolverRefreshFuture =
+ transportFactory
+ .getScheduledExecutorService()
+ .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ channelExecutor.executeLater(new NameResolverErrorHandler()).drain();
}
}
@@ -1419,14 +1417,16 @@ final class ManagedChannelImpl extends ManagedChannel implements
// TODO(zhangkun83): consider a better approach
// (https://github.com/grpc/grpc-java/issues/2562).
if (!terminating) {
+ final class ShutdownSubchannel implements Runnable {
+ @Override
+ public void run() {
+ subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
+ }
+ }
+
delayedShutdownTask = transportFactory.getScheduledExecutorService().schedule(
new LogExceptionRunnable(
- new Runnable() {
- @Override
- public void run() {
- subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
- }
- }), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ new ShutdownSubchannel()), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
return;
}
}
@@ -1463,4 +1463,62 @@ final class ManagedChannelImpl extends ManagedChannel implements
.add("target", target)
.toString();
}
+
+ private final class PanicChannelExecutor extends ChannelExecutor {
+ @Override
+ void handleUncaughtThrowable(Throwable t) {
+ super.handleUncaughtThrowable(t);
+ panic(t);
+ }
+ }
+
+ /**
+ * Called from channelExecutor.
+ */
+ private final class DelayedTransportListener implements ManagedClientTransport.Listener {
+ @Override
+ public void transportShutdown(Status s) {
+ checkState(shutdown.get(), "Channel must have been shut down");
+ }
+
+ @Override
+ public void transportReady() {
+ // Don't care
+ }
+
+ @Override
+ public void transportInUse(final boolean inUse) {
+ inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
+ }
+
+ @Override
+ public void transportTerminated() {
+ checkState(shutdown.get(), "Channel must have been shut down");
+ terminating = true;
+ shutdownNameResolverAndLoadBalancer(false);
+ // No need to call channelStateManager since we are already in SHUTDOWN state.
+ // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them
+ // here.
+ maybeShutdownNowSubchannels();
+ maybeTerminateChannel();
+ }
+ }
+
+ /**
+ * Must be accessed from channelExecutor.
+ */
+ private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
+ @Override
+ void handleInUse() {
+ exitIdleMode();
+ }
+
+ @Override
+ void handleNotInUse() {
+ if (shutdown.get()) {
+ return;
+ }
+ rescheduleIdleTimer();
+ }
+ }
}
diff --git a/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java b/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java
index 48e00665c..56548b282 100644
--- a/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java
+++ b/core/src/main/java/io/grpc/internal/MetadataApplierImpl.java
@@ -20,7 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import io.grpc.CallCredentials.MetadataApplier;
+import io.grpc.CallCredentials2.MetadataApplier;
import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.Metadata;
@@ -29,7 +29,7 @@ import io.grpc.Status;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
-final class MetadataApplierImpl implements MetadataApplier {
+final class MetadataApplierImpl extends MetadataApplier {
private final ClientTransport transport;
private final MethodDescriptor<?, ?> method;
private final Metadata origHeaders;
diff --git a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java
new file mode 100644
index 000000000..e2a9720d4
--- /dev/null
+++ b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import com.google.common.base.MoreObjects;
+import io.grpc.Attributes;
+import io.grpc.ConnectivityState;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.ExperimentalApi;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.LoadBalancer.SubchannelPicker;
+import io.grpc.LoadBalancer;
+import io.grpc.ManagedChannel;
+import io.grpc.NameResolver;
+import java.util.List;
+
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
+public abstract class ForwardingLoadBalancerHelper extends LoadBalancer.Helper {
+ /**
+ * Returns the underlying helper.
+ */
+ protected abstract LoadBalancer.Helper delegate();
+
+ @Override
+ public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
+ return delegate().createSubchannel(addrs, attrs);
+ }
+
+ @Override
+ public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
+ return delegate().createSubchannel(addrs, attrs);
+ }
+
+ @Override
+ public void updateSubchannelAddresses(
+ Subchannel subchannel, EquivalentAddressGroup addrs) {
+ delegate().updateSubchannelAddresses(subchannel, addrs);
+ }
+
+ @Override
+ public void updateSubchannelAddresses(
+ Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
+ delegate().updateSubchannelAddresses(subchannel, addrs);
+ }
+
+ @Override
+ public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
+ return delegate().createOobChannel(eag, authority);
+ }
+
+ @Override
+ public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
+ delegate().updateOobChannelAddresses(channel, eag);
+ }
+
+ @Override
+ public void updateBalancingState(
+ ConnectivityState newState, SubchannelPicker newPicker) {
+ delegate().updateBalancingState(newState, newPicker);
+ }
+
+ @Override
+ public void runSerialized(Runnable task) {
+ delegate().runSerialized(task);
+ }
+
+ @Override
+ public NameResolver.Factory getNameResolverFactory() {
+ return delegate().getNameResolverFactory();
+ }
+
+ @Override
+ public String getAuthority() {
+ return delegate().getAuthority();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
+ }
+}
diff --git a/core/src/test/java/io/grpc/ForwardingTestUtil.java b/core/src/test/java/io/grpc/ForwardingTestUtil.java
index ce9ca5eb7..e9f1c5b23 100644
--- a/core/src/test/java/io/grpc/ForwardingTestUtil.java
+++ b/core/src/test/java/io/grpc/ForwardingTestUtil.java
@@ -28,6 +28,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Collection;
+import javax.annotation.Nullable;
/**
* A util class to help test forwarding classes.
@@ -35,9 +36,8 @@ import java.util.Collection;
public final class ForwardingTestUtil {
/**
* Use reflection to perform a basic sanity test. The forwarding class should forward all public
- * methods to the delegate, except for those in skippedMethods.
- * This does NOT verify that arguments or return values are forwarded properly. It only alerts
- * the developer if a forward method is missing.
+ * methods to the delegate, except for those in skippedMethods. This does NOT verify that
+ * arguments or return values are forwarded properly.
*
* @param delegateClass The class whose methods should be forwarded.
* @param mockDelegate The mockito mock of the delegate class.
@@ -49,6 +49,34 @@ public final class ForwardingTestUtil {
T mockDelegate,
T forwarder,
Collection<Method> skippedMethods) throws Exception {
+ testMethodsForwarded(
+ delegateClass, mockDelegate, forwarder, skippedMethods,
+ new ArgumentProvider() {
+ @Override
+ public Object get(Class<?> clazz) {
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Use reflection to perform a basic sanity test. The forwarding class should forward all public
+ * methods to the delegate, except for those in skippedMethods. This does NOT verify that return
+ * values are forwarded properly, and can only verify the propagation of arguments for which
+ * {@code argProvider} returns distinctive non-null values.
+ *
+ * @param delegateClass The class whose methods should be forwarded.
+ * @param mockDelegate The mockito mock of the delegate class.
+ * @param forwarder The forwarder object that forwards to the mockDelegate.
+ * @param skippedMethods A collection of methods that are skipped by the test.
+ * @param argProvider provides argument to be passed to tested forwarding methods.
+ */
+ public static <T> void testMethodsForwarded(
+ Class<T> delegateClass,
+ T mockDelegate,
+ T forwarder,
+ Collection<Method> skippedMethods,
+ ArgumentProvider argProvider) throws Exception {
assertTrue(mockingDetails(mockDelegate).isMock());
assertFalse(mockingDetails(forwarder).isMock());
@@ -61,7 +89,9 @@ public final class ForwardingTestUtil {
Class<?>[] argTypes = method.getParameterTypes();
Object[] args = new Object[argTypes.length];
for (int i = 0; i < argTypes.length; i++) {
- args[i] = Defaults.defaultValue(argTypes[i]);
+ if ((args[i] = argProvider.get(argTypes[i])) == null) {
+ args[i] = Defaults.defaultValue(argTypes[i]);
+ }
}
method.invoke(forwarder, args);
try {
@@ -85,4 +115,20 @@ public final class ForwardingTestUtil {
assertEquals("Method toString() was not forwarded properly", expected, actual);
}
}
+
+ /**
+ * Provides arguments for forwarded methods tested in {@link #testMethodsForwarded}.
+ */
+ public interface ArgumentProvider {
+ /**
+ * Return an instance of the given class to be used as an argument passed to one method call.
+ * If one method has multiple arguments with the same type, each occurrence will call this
+ * method once. It is recommended that each invocation returns a distinctive object for the
+ * same type, in order to verify that arguments are passed by the tested class correctly.
+ *
+ * @return a value to be passed as an argument. If {@code null}, {@link Default#defaultValue}
+ * will be used.
+ */
+ @Nullable Object get(Class<?> clazz);
+ }
}
diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java
index 1542a6fac..71c574298 100644
--- a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java
+++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java
@@ -19,11 +19,14 @@ package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
+import io.grpc.InternalChannelz;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
@@ -52,7 +55,8 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class AutoConfiguredLoadBalancerFactoryTest {
- private final AutoConfiguredLoadBalancerFactory lbf = new AutoConfiguredLoadBalancerFactory();
+ private final AutoConfiguredLoadBalancerFactory lbf =
+ new AutoConfiguredLoadBalancerFactory(null, null);
@Test
public void newLoadBalancer_isAuto() {
@@ -258,6 +262,80 @@ public class AutoConfiguredLoadBalancerFactoryTest {
}
}
+ @Test
+ public void channelTracing_lbPolicyChanged() {
+ ChannelTracer channelTracer = new ChannelTracer(100, 1000, "dummy_type");
+ TimeProvider timeProvider = new TimeProvider() {
+ @Override
+ public long currentTimeNanos() {
+ return 101;
+ }
+ };
+
+ InternalChannelz.ChannelStats.Builder statsBuilder
+ = new InternalChannelz.ChannelStats.Builder();
+ channelTracer.updateBuilder(statsBuilder);
+ List<EquivalentAddressGroup> servers =
+ Collections.singletonList(
+ new EquivalentAddressGroup(new SocketAddress(){}, Attributes.EMPTY));
+ Helper helper = new TestHelper() {
+ @Override
+ public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
+ return new TestSubchannel(addrs, attrs);
+ }
+
+ @Override
+ public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
+ return mock(ManagedChannel.class, RETURNS_DEEP_STUBS);
+ }
+
+ @Override
+ public String getAuthority() {
+ return "fake_authority";
+ }
+
+ @Override
+ public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
+ // noop
+ }
+ };
+ int prevNumOfEvents = statsBuilder.build().channelTrace.events.size();
+
+ LoadBalancer lb =
+ new AutoConfiguredLoadBalancerFactory(channelTracer, timeProvider).newLoadBalancer(helper);
+ lb.handleResolvedAddressGroups(servers, Attributes.EMPTY);
+ channelTracer.updateBuilder(statsBuilder);
+ assertThat(statsBuilder.build().channelTrace.events).hasSize(prevNumOfEvents);
+
+ Map<String, Object> serviceConfig = new HashMap<String, Object>();
+ serviceConfig.put("loadBalancingPolicy", "round_robin");
+ lb.handleResolvedAddressGroups(servers,
+ Attributes.newBuilder()
+ .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build());
+ channelTracer.updateBuilder(statsBuilder);
+ assertThat(statsBuilder.build().channelTrace.events).hasSize(prevNumOfEvents + 1);
+ assertThat(statsBuilder.build().channelTrace.events.get(prevNumOfEvents).description)
+ .isEqualTo("Load balancer changed from PickFirstBalancer to RoundRobinLoadBalancer");
+ prevNumOfEvents = statsBuilder.build().channelTrace.events.size();
+
+ serviceConfig.put("loadBalancingPolicy", "round_robin");
+ lb.handleResolvedAddressGroups(servers,
+ Attributes.newBuilder()
+ .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build());
+ channelTracer.updateBuilder(statsBuilder);
+ assertThat(statsBuilder.build().channelTrace.events).hasSize(prevNumOfEvents);
+
+ servers = Collections.singletonList(new EquivalentAddressGroup(
+ new SocketAddress(){},
+ Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()));
+ lb.handleResolvedAddressGroups(servers, Attributes.EMPTY);
+
+ channelTracer.updateBuilder(statsBuilder);
+ assertThat(statsBuilder.build().channelTrace.events).hasSize(prevNumOfEvents + 1);
+ assertThat(statsBuilder.build().channelTrace.events.get(prevNumOfEvents).description)
+ .isEqualTo("Load balancer changed from RoundRobinLoadBalancer to GrpclbLoadBalancer");
+ }
+
public static class ForwardingLoadBalancer extends LoadBalancer {
private final LoadBalancer delegate;
diff --git a/core/src/test/java/io/grpc/internal/CallCredentials2ApplyingTest.java b/core/src/test/java/io/grpc/internal/CallCredentials2ApplyingTest.java
new file mode 100644
index 000000000..fd2b2c18f
--- /dev/null
+++ b/core/src/test/java/io/grpc/internal/CallCredentials2ApplyingTest.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2016 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.grpc.Attributes;
+import io.grpc.CallCredentials.RequestInfo;
+import io.grpc.CallCredentials2;
+import io.grpc.CallCredentials2.MetadataApplier;
+import io.grpc.CallOptions;
+import io.grpc.IntegerMarshaller;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.SecurityLevel;
+import io.grpc.Status;
+import io.grpc.StringMarshaller;
+import java.net.SocketAddress;
+import java.util.concurrent.Executor;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Unit test for {@link CallCredentials2} applying functionality implemented by {@link
+ * CallCredentialsApplyingTransportFactory} and {@link MetadataApplierImpl}.
+ */
+@RunWith(JUnit4.class)
+public class CallCredentials2ApplyingTest {
+ @Mock
+ private ClientTransportFactory mockTransportFactory;
+
+ @Mock
+ private ConnectionClientTransport mockTransport;
+
+ @Mock
+ private ClientStream mockStream;
+
+ @Mock
+ private CallCredentials2 mockCreds;
+
+ @Mock
+ private Executor mockExecutor;
+
+ @Mock
+ private SocketAddress address;
+
+ private static final String AUTHORITY = "testauthority";
+ private static final String USER_AGENT = "testuseragent";
+ private static final Attributes.Key<String> ATTR_KEY = Attributes.Key.create("somekey");
+ private static final String ATTR_VALUE = "somevalue";
+ private static final MethodDescriptor<String, Integer> method =
+ MethodDescriptor.<String, Integer>newBuilder()
+ .setType(MethodDescriptor.MethodType.UNKNOWN)
+ .setFullMethodName("service/method")
+ .setRequestMarshaller(new StringMarshaller())
+ .setResponseMarshaller(new IntegerMarshaller())
+ .build();
+ private static final Metadata.Key<String> ORIG_HEADER_KEY =
+ Metadata.Key.of("header1", Metadata.ASCII_STRING_MARSHALLER);
+ private static final String ORIG_HEADER_VALUE = "some original header value";
+ private static final Metadata.Key<String> CREDS_KEY =
+ Metadata.Key.of("test-creds", Metadata.ASCII_STRING_MARSHALLER);
+ private static final String CREDS_VALUE = "some credentials";
+
+ private final Metadata origHeaders = new Metadata();
+ private ForwardingConnectionClientTransport transport;
+ private CallOptions callOptions;
+
+ @Before
+ public void setUp() {
+ ClientTransportFactory.ClientTransportOptions clientTransportOptions =
+ new ClientTransportFactory.ClientTransportOptions()
+ .setAuthority(AUTHORITY)
+ .setUserAgent(USER_AGENT);
+
+ MockitoAnnotations.initMocks(this);
+ origHeaders.put(ORIG_HEADER_KEY, ORIG_HEADER_VALUE);
+ when(mockTransportFactory.newClientTransport(address, clientTransportOptions))
+ .thenReturn(mockTransport);
+ when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
+ .thenReturn(mockStream);
+ ClientTransportFactory transportFactory = new CallCredentialsApplyingTransportFactory(
+ mockTransportFactory, mockExecutor);
+ transport = (ForwardingConnectionClientTransport)
+ transportFactory.newClientTransport(address, clientTransportOptions);
+ callOptions = CallOptions.DEFAULT.withCallCredentials(mockCreds);
+ verify(mockTransportFactory).newClientTransport(address, clientTransportOptions);
+ assertSame(mockTransport, transport.delegate());
+ }
+
+ @Test
+ public void parameterPropagation_base() {
+ Attributes transportAttrs = Attributes.newBuilder().set(ATTR_KEY, ATTR_VALUE).build();
+ when(mockTransport.getAttributes()).thenReturn(transportAttrs);
+
+ transport.newStream(method, origHeaders, callOptions);
+
+ ArgumentCaptor<RequestInfo> infoCaptor = ArgumentCaptor.forClass(null);
+ verify(mockCreds).applyRequestMetadata(
+ infoCaptor.capture(), same(mockExecutor), any(MetadataApplier.class));
+ RequestInfo info = infoCaptor.getValue();
+ assertSame(method, info.getMethodDescriptor());
+ Attributes attrs = info.getTransportAttrs();
+ assertSame(ATTR_VALUE, info.getTransportAttrs().get(ATTR_KEY));
+ assertSame(AUTHORITY, info.getAuthority());
+ assertSame(SecurityLevel.NONE, info.getSecurityLevel());
+ }
+
+ @Test
+ public void parameterPropagation_transportSetSecurityLevel() {
+ Attributes transportAttrs = Attributes.newBuilder()
+ .set(ATTR_KEY, ATTR_VALUE)
+ .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.INTEGRITY)
+ .build();
+ when(mockTransport.getAttributes()).thenReturn(transportAttrs);
+
+ transport.newStream(method, origHeaders, callOptions);
+
+ ArgumentCaptor<RequestInfo> infoCaptor = ArgumentCaptor.forClass(null);
+ verify(mockCreds).applyRequestMetadata(
+ infoCaptor.capture(), same(mockExecutor), any(MetadataApplier.class));
+ RequestInfo info = infoCaptor.getValue();
+ assertSame(method, info.getMethodDescriptor());
+ assertSame(ATTR_VALUE, info.getTransportAttrs().get(ATTR_KEY));
+ assertSame(AUTHORITY, info.getAuthority());
+ assertSame(SecurityLevel.INTEGRITY, info.getSecurityLevel());
+ }
+
+ @Test
+ public void parameterPropagation_callOptionsSetAuthority() {
+ Attributes transportAttrs = Attributes.newBuilder()
+ .set(ATTR_KEY, ATTR_VALUE)
+ .build();
+ when(mockTransport.getAttributes()).thenReturn(transportAttrs);
+ Executor anotherExecutor = mock(Executor.class);
+
+ transport.newStream(method, origHeaders,
+ callOptions.withAuthority("calloptions-authority").withExecutor(anotherExecutor));
+
+ ArgumentCaptor<RequestInfo> infoCaptor = ArgumentCaptor.forClass(null);
+ verify(mockCreds).applyRequestMetadata(
+ infoCaptor.capture(), same(anotherExecutor), any(MetadataApplier.class));
+ RequestInfo info = infoCaptor.getValue();
+ assertSame(method, info.getMethodDescriptor());
+ assertSame(ATTR_VALUE, info.getTransportAttrs().get(ATTR_KEY));
+ assertEquals("calloptions-authority", info.getAuthority());
+ assertSame(SecurityLevel.NONE, info.getSecurityLevel());
+ }
+
+ @Test
+ public void credentialThrows() {
+ final RuntimeException ex = new RuntimeException();
+ when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY);
+ doThrow(ex).when(mockCreds).applyRequestMetadata(
+ any(RequestInfo.class), same(mockExecutor), any(MetadataApplier.class));
+
+ FailingClientStream stream =
+ (FailingClientStream) transport.newStream(method, origHeaders, callOptions);
+
+ verify(mockTransport, never()).newStream(method, origHeaders, callOptions);
+ assertEquals(Status.Code.UNAUTHENTICATED, stream.getError().getCode());
+ assertSame(ex, stream.getError().getCause());
+ }
+
+ @Test
+ public void applyMetadata_inline() {
+ when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ MetadataApplier applier = (MetadataApplier) invocation.getArguments()[2];
+ Metadata headers = new Metadata();
+ headers.put(CREDS_KEY, CREDS_VALUE);
+ applier.apply(headers);
+ return null;
+ }
+ }).when(mockCreds).applyRequestMetadata(
+ any(RequestInfo.class), same(mockExecutor), any(MetadataApplier.class));
+
+ ClientStream stream = transport.newStream(method, origHeaders, callOptions);
+
+ verify(mockTransport).newStream(method, origHeaders, callOptions);
+ assertSame(mockStream, stream);
+ assertEquals(CREDS_VALUE, origHeaders.get(CREDS_KEY));
+ assertEquals(ORIG_HEADER_VALUE, origHeaders.get(ORIG_HEADER_KEY));
+ }
+
+ @Test
+ public void fail_inline() {
+ final Status error = Status.FAILED_PRECONDITION.withDescription("channel not secure for creds");
+ when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ MetadataApplier applier = (MetadataApplier) invocation.getArguments()[2];
+ applier.fail(error);
+ return null;
+ }
+ }).when(mockCreds).applyRequestMetadata(
+ any(RequestInfo.class), same(mockExecutor), any(MetadataApplier.class));
+
+ FailingClientStream stream =
+ (FailingClientStream) transport.newStream(method, origHeaders, callOptions);
+
+ verify(mockTransport, never()).newStream(method, origHeaders, callOptions);
+ assertSame(error, stream.getError());
+ }
+
+ @Test
+ public void applyMetadata_delayed() {
+ when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY);
+
+ // Will call applyRequestMetadata(), which is no-op.
+ DelayedStream stream = (DelayedStream) transport.newStream(method, origHeaders, callOptions);
+
+ ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null);
+ verify(mockCreds).applyRequestMetadata(
+ any(RequestInfo.class), same(mockExecutor), applierCaptor.capture());
+ verify(mockTransport, never()).newStream(method, origHeaders, callOptions);
+
+ Metadata headers = new Metadata();
+ headers.put(CREDS_KEY, CREDS_VALUE);
+ applierCaptor.getValue().apply(headers);
+
+ verify(mockTransport).newStream(method, origHeaders, callOptions);
+ assertSame(mockStream, stream.getRealStream());
+ assertEquals(CREDS_VALUE, origHeaders.get(CREDS_KEY));
+ assertEquals(ORIG_HEADER_VALUE, origHeaders.get(ORIG_HEADER_KEY));
+ }
+
+ @Test
+ public void fail_delayed() {
+ when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY);
+
+ // Will call applyRequestMetadata(), which is no-op.
+ DelayedStream stream = (DelayedStream) transport.newStream(method, origHeaders, callOptions);
+
+ ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null);
+ verify(mockCreds).applyRequestMetadata(
+ any(RequestInfo.class), same(mockExecutor), applierCaptor.capture());
+
+ Status error = Status.FAILED_PRECONDITION.withDescription("channel not secure for creds");
+ applierCaptor.getValue().fail(error);
+
+ verify(mockTransport, never()).newStream(method, origHeaders, callOptions);
+ FailingClientStream failingStream = (FailingClientStream) stream.getRealStream();
+ assertSame(error, failingStream.getError());
+ }
+
+ @Test
+ public void noCreds() {
+ callOptions = callOptions.withCallCredentials(null);
+ ClientStream stream = transport.newStream(method, origHeaders, callOptions);
+
+ verify(mockTransport).newStream(method, origHeaders, callOptions);
+ assertSame(mockStream, stream);
+ assertNull(origHeaders.get(CREDS_KEY));
+ assertEquals(ORIG_HEADER_VALUE, origHeaders.get(ORIG_HEADER_KEY));
+ }
+}
diff --git a/core/src/test/java/io/grpc/internal/CallCredentialsApplyingTest.java b/core/src/test/java/io/grpc/internal/CallCredentialsApplyingTest.java
index c6a9bfb43..a8ca1219b 100644
--- a/core/src/test/java/io/grpc/internal/CallCredentialsApplyingTest.java
+++ b/core/src/test/java/io/grpc/internal/CallCredentialsApplyingTest.java
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.when;
import io.grpc.Attributes;
import io.grpc.CallCredentials;
-import io.grpc.CallCredentials.MetadataApplier;
import io.grpc.CallOptions;
import io.grpc.IntegerMarshaller;
import io.grpc.Metadata;
@@ -55,6 +54,7 @@ import org.mockito.stubbing.Answer;
* CallCredentialsApplyingTransportFactory} and {@link MetadataApplierImpl}.
*/
@RunWith(JUnit4.class)
+@Deprecated
public class CallCredentialsApplyingTest {
@Mock
private ClientTransportFactory mockTransportFactory;
@@ -127,7 +127,7 @@ public class CallCredentialsApplyingTest {
ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(null);
verify(mockCreds).applyRequestMetadata(same(method), attrsCaptor.capture(), same(mockExecutor),
- any(MetadataApplier.class));
+ any(CallCredentials.MetadataApplier.class));
Attributes attrs = attrsCaptor.getValue();
assertSame(ATTR_VALUE, attrs.get(ATTR_KEY));
assertSame(AUTHORITY, attrs.get(CallCredentials.ATTR_AUTHORITY));
@@ -147,7 +147,7 @@ public class CallCredentialsApplyingTest {
ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(null);
verify(mockCreds).applyRequestMetadata(same(method), attrsCaptor.capture(), same(mockExecutor),
- any(MetadataApplier.class));
+ any(CallCredentials.MetadataApplier.class));
Attributes attrs = attrsCaptor.getValue();
assertSame(ATTR_VALUE, attrs.get(ATTR_KEY));
assertEquals("transport-override-authority", attrs.get(CallCredentials.ATTR_AUTHORITY));
@@ -169,7 +169,7 @@ public class CallCredentialsApplyingTest {
ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(null);
verify(mockCreds).applyRequestMetadata(same(method), attrsCaptor.capture(),
- same(anotherExecutor), any(MetadataApplier.class));
+ same(anotherExecutor), any(CallCredentials.MetadataApplier.class));
Attributes attrs = attrsCaptor.getValue();
assertSame(ATTR_VALUE, attrs.get(ATTR_KEY));
assertEquals("calloptions-authority", attrs.get(CallCredentials.ATTR_AUTHORITY));
@@ -181,7 +181,8 @@ public class CallCredentialsApplyingTest {
final RuntimeException ex = new RuntimeException();
when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY);
doThrow(ex).when(mockCreds).applyRequestMetadata(
- same(method), any(Attributes.class), same(mockExecutor), any(MetadataApplier.class));
+ same(method), any(Attributes.class), same(mockExecutor),
+ any(CallCredentials.MetadataApplier.class));
FailingClientStream stream =
(FailingClientStream) transport.newStream(method, origHeaders, callOptions);
@@ -197,14 +198,15 @@ public class CallCredentialsApplyingTest {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
- MetadataApplier applier = (MetadataApplier) invocation.getArguments()[3];
+ CallCredentials.MetadataApplier applier =
+ (CallCredentials.MetadataApplier) invocation.getArguments()[3];
Metadata headers = new Metadata();
headers.put(CREDS_KEY, CREDS_VALUE);
applier.apply(headers);
return null;
}
}).when(mockCreds).applyRequestMetadata(same(method), any(Attributes.class),
- same(mockExecutor), any(MetadataApplier.class));
+ same(mockExecutor), any(CallCredentials.MetadataApplier.class));
ClientStream stream = transport.newStream(method, origHeaders, callOptions);
@@ -221,12 +223,13 @@ public class CallCredentialsApplyingTest {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
- MetadataApplier applier = (MetadataApplier) invocation.getArguments()[3];
+ CallCredentials.MetadataApplier applier =
+ (CallCredentials.MetadataApplier) invocation.getArguments()[3];
applier.fail(error);
return null;
}
}).when(mockCreds).applyRequestMetadata(same(method), any(Attributes.class),
- same(mockExecutor), any(MetadataApplier.class));
+ same(mockExecutor), any(CallCredentials.MetadataApplier.class));
FailingClientStream stream =
(FailingClientStream) transport.newStream(method, origHeaders, callOptions);
@@ -242,7 +245,7 @@ public class CallCredentialsApplyingTest {
// Will call applyRequestMetadata(), which is no-op.
DelayedStream stream = (DelayedStream) transport.newStream(method, origHeaders, callOptions);
- ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null);
+ ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null);
verify(mockCreds).applyRequestMetadata(same(method), any(Attributes.class),
same(mockExecutor), applierCaptor.capture());
verify(mockTransport, never()).newStream(method, origHeaders, callOptions);
@@ -264,7 +267,7 @@ public class CallCredentialsApplyingTest {
// Will call applyRequestMetadata(), which is no-op.
DelayedStream stream = (DelayedStream) transport.newStream(method, origHeaders, callOptions);
- ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null);
+ ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null);
verify(mockCreds).applyRequestMetadata(same(method), any(Attributes.class),
same(mockExecutor), applierCaptor.capture());
diff --git a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
index 44bde2222..3f7adfa57 100644
--- a/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
+++ b/core/src/test/java/io/grpc/internal/DnsNameResolverTest.java
@@ -18,6 +18,7 @@ package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
@@ -27,13 +28,10 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.net.InetAddresses;
-import com.google.common.testing.FakeTicker;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
@@ -42,6 +40,7 @@ import io.grpc.internal.DnsNameResolver.ResolutionResults;
import io.grpc.internal.DnsNameResolver.ResourceResolver;
import io.grpc.internal.DnsNameResolver.ResourceResolverFactory;
import io.grpc.internal.SharedResourceHolder.Resource;
+import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -55,8 +54,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -108,25 +105,21 @@ public class DnsNameResolverTest {
private NameResolver.Listener mockListener;
@Captor
private ArgumentCaptor<List<EquivalentAddressGroup>> resultCaptor;
- @Nullable
- private String networkaddressCacheTtlPropertyValue;
private DnsNameResolver newResolver(String name, int port) {
- return newResolver(name, port, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted());
+ return newResolver(name, port, GrpcUtil.NOOP_PROXY_DETECTOR);
}
private DnsNameResolver newResolver(
String name,
int port,
- ProxyDetector proxyDetector,
- Stopwatch stopwatch) {
+ ProxyDetector proxyDetector) {
DnsNameResolver dnsResolver = new DnsNameResolver(
null,
name,
Attributes.newBuilder().set(NameResolver.Factory.PARAMS_DEFAULT_PORT, port).build(),
fakeExecutorResource,
- proxyDetector,
- stopwatch);
+ proxyDetector);
return dnsResolver;
}
@@ -134,19 +127,6 @@ public class DnsNameResolverTest {
public void setUp() {
MockitoAnnotations.initMocks(this);
DnsNameResolver.enableJndi = true;
- networkaddressCacheTtlPropertyValue =
- System.getProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY);
- }
-
- @After
- public void restoreSystemProperty() {
- if (networkaddressCacheTtlPropertyValue == null) {
- System.clearProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY);
- } else {
- System.setProperty(
- DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY,
- networkaddressCacheTtlPropertyValue);
- }
}
@After
@@ -198,8 +178,7 @@ public class DnsNameResolverTest {
}
@Test
- public void resolve_neverCache() throws Exception {
- System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, "0");
+ public void resolve() throws Exception {
final List<InetAddress> answer1 = createAddressList(2);
final List<InetAddress> answer2 = createAddressList(1);
String name = "foo.googleapis.com";
@@ -227,156 +206,6 @@ public class DnsNameResolverTest {
}
@Test
- public void resolve_cacheForever() throws Exception {
- System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, "-1");
- final List<InetAddress> answer1 = createAddressList(2);
- String name = "foo.googleapis.com";
- FakeTicker fakeTicker = new FakeTicker();
-
- DnsNameResolver resolver =
- newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
- AddressResolver mockResolver = mock(AddressResolver.class);
- when(mockResolver.resolveAddress(Matchers.anyString()))
- .thenReturn(answer1)
- .thenThrow(new AssertionError("should not called twice"));
- resolver.setAddressResolver(mockResolver);
-
- resolver.start(mockListener);
- assertEquals(1, fakeExecutor.runDueTasks());
- verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
- assertAnswerMatches(answer1, 81, resultCaptor.getValue());
- assertEquals(0, fakeClock.numPendingTasks());
-
- fakeTicker.advance(1, TimeUnit.DAYS);
- resolver.refresh();
- assertEquals(1, fakeExecutor.runDueTasks());
- verifyNoMoreInteractions(mockListener);
- assertAnswerMatches(answer1, 81, resultCaptor.getValue());
- assertEquals(0, fakeClock.numPendingTasks());
-
- resolver.shutdown();
-
- verify(mockResolver).resolveAddress(Matchers.anyString());
- }
-
- @Test
- public void resolve_usingCache() throws Exception {
- long ttl = 60;
- System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, Long.toString(ttl));
- final List<InetAddress> answer = createAddressList(2);
- String name = "foo.googleapis.com";
- FakeTicker fakeTicker = new FakeTicker();
-
- DnsNameResolver resolver =
- newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
- AddressResolver mockResolver = mock(AddressResolver.class);
- when(mockResolver.resolveAddress(Matchers.anyString()))
- .thenReturn(answer)
- .thenThrow(new AssertionError("should not reach here."));
- resolver.setAddressResolver(mockResolver);
-
- resolver.start(mockListener);
- assertEquals(1, fakeExecutor.runDueTasks());
- verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
- assertAnswerMatches(answer, 81, resultCaptor.getValue());
- assertEquals(0, fakeClock.numPendingTasks());
-
- // this refresh should return cached result
- fakeTicker.advance(ttl - 1, TimeUnit.SECONDS);
- resolver.refresh();
- assertEquals(1, fakeExecutor.runDueTasks());
- verifyNoMoreInteractions(mockListener);
- assertAnswerMatches(answer, 81, resultCaptor.getValue());
- assertEquals(0, fakeClock.numPendingTasks());
-
- resolver.shutdown();
-
- verify(mockResolver).resolveAddress(Matchers.anyString());
- }
-
- @Test
- public void resolve_cacheExpired() throws Exception {
- long ttl = 60;
- System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, Long.toString(ttl));
- final List<InetAddress> answer1 = createAddressList(2);
- final List<InetAddress> answer2 = createAddressList(1);
- String name = "foo.googleapis.com";
- FakeTicker fakeTicker = new FakeTicker();
-
- DnsNameResolver resolver =
- newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
- AddressResolver mockResolver = mock(AddressResolver.class);
- when(mockResolver.resolveAddress(Matchers.anyString())).thenReturn(answer1).thenReturn(answer2);
- resolver.setAddressResolver(mockResolver);
-
- resolver.start(mockListener);
- assertEquals(1, fakeExecutor.runDueTasks());
- verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
- assertAnswerMatches(answer1, 81, resultCaptor.getValue());
- assertEquals(0, fakeClock.numPendingTasks());
-
- fakeTicker.advance(ttl + 1, TimeUnit.SECONDS);
- resolver.refresh();
- assertEquals(1, fakeExecutor.runDueTasks());
- verify(mockListener, times(2)).onAddresses(resultCaptor.capture(), any(Attributes.class));
- assertAnswerMatches(answer2, 81, resultCaptor.getValue());
- assertEquals(0, fakeClock.numPendingTasks());
-
- resolver.shutdown();
-
- verify(mockResolver, times(2)).resolveAddress(Matchers.anyString());
- }
-
- @Test
- public void resolve_invalidTtlPropertyValue() throws Exception {
- System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, "not_a_number");
- resolveDefaultValue();
- }
-
- @Test
- public void resolve_noPropertyValue() throws Exception {
- System.clearProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY);
- resolveDefaultValue();
- }
-
- private void resolveDefaultValue() throws Exception {
- final List<InetAddress> answer1 = createAddressList(2);
- final List<InetAddress> answer2 = createAddressList(1);
- String name = "foo.googleapis.com";
- FakeTicker fakeTicker = new FakeTicker();
-
- DnsNameResolver resolver =
- newResolver(name, 81, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(fakeTicker));
- AddressResolver mockResolver = mock(AddressResolver.class);
- when(mockResolver.resolveAddress(Matchers.anyString())).thenReturn(answer1).thenReturn(answer2);
- resolver.setAddressResolver(mockResolver);
-
- resolver.start(mockListener);
- assertEquals(1, fakeExecutor.runDueTasks());
- verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
- assertAnswerMatches(answer1, 81, resultCaptor.getValue());
- assertEquals(0, fakeClock.numPendingTasks());
-
- fakeTicker.advance(DnsNameResolver.DEFAULT_NETWORK_CACHE_TTL_SECONDS, TimeUnit.SECONDS);
- resolver.refresh();
- assertEquals(1, fakeExecutor.runDueTasks());
- verifyNoMoreInteractions(mockListener);
- assertAnswerMatches(answer1, 81, resultCaptor.getValue());
- assertEquals(0, fakeClock.numPendingTasks());
-
- fakeTicker.advance(1, TimeUnit.SECONDS);
- resolver.refresh();
- assertEquals(1, fakeExecutor.runDueTasks());
- verify(mockListener, times(2)).onAddresses(resultCaptor.capture(), any(Attributes.class));
- assertAnswerMatches(answer2, 81, resultCaptor.getValue());
- assertEquals(0, fakeClock.numPendingTasks());
-
- resolver.shutdown();
-
- verify(mockResolver, times(2)).resolveAddress(Matchers.anyString());
- }
-
- @Test
public void resolveAll_nullResourceResolver() throws Exception {
final String hostname = "addr.fake";
final Inet4Address backendAddr = InetAddresses.fromInteger(0x7f000001);
@@ -397,6 +226,23 @@ public class DnsNameResolverTest {
}
@Test
+ public void resolveAll_nullResourceResolver_addressFailure() throws Exception {
+ final String hostname = "addr.fake";
+
+ AddressResolver mockResolver = mock(AddressResolver.class);
+ when(mockResolver.resolveAddress(Matchers.anyString()))
+ .thenThrow(new IOException("no addr"));
+ ResourceResolver resourceResolver = null;
+ boolean resovleSrv = true;
+ boolean resolveTxt = true;
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("no addr");
+
+ DnsNameResolver.resolveAll(mockResolver, resourceResolver, resovleSrv, resolveTxt, hostname);
+ }
+
+ @Test
public void resolveAll_presentResourceResolver() throws Exception {
final String hostname = "addr.fake";
final Inet4Address backendAddr = InetAddresses.fromInteger(0x7f000001);
@@ -502,8 +348,7 @@ public class DnsNameResolverTest {
"password");
when(alwaysDetectProxy.proxyFor(any(SocketAddress.class)))
.thenReturn(proxyParameters);
- DnsNameResolver resolver =
- newResolver(name, port, alwaysDetectProxy, Stopwatch.createUnstarted());
+ DnsNameResolver resolver = newResolver(name, port, alwaysDetectProxy);
AddressResolver mockAddressResolver = mock(AddressResolver.class);
when(mockAddressResolver.resolveAddress(Matchers.anyString())).thenThrow(new AssertionError());
resolver.setAddressResolver(mockAddressResolver);
@@ -754,6 +599,82 @@ public class DnsNameResolverTest {
assertNotNull(DnsNameResolver.maybeChooseServiceConfig(choice, new Random(), "localhost"));
}
+ @Test
+ public void shouldUseJndi_alwaysFalseIfDisabled() {
+ boolean enableJndi = false;
+ boolean enableJndiLocalhost = true;
+ String host = "seemingly.valid.host";
+
+ assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, host));
+ }
+
+ @Test
+ public void shouldUseJndi_falseIfDisabledForLocalhost() {
+ boolean enableJndi = true;
+ boolean enableJndiLocalhost = false;
+
+ assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "localhost"));
+ assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "LOCALHOST"));
+ }
+
+ @Test
+ public void shouldUseJndi_trueIfLocalhostOverriden() {
+ boolean enableJndi = true;
+ boolean enableJndiLocalhost = true;
+ String host = "localhost";
+
+ assertTrue(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, host));
+ }
+
+ @Test
+ public void shouldUseJndi_falseForIpv6() {
+ boolean enableJndi = true;
+ boolean enableJndiLocalhost = false;
+
+ assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "::"));
+ assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "::1"));
+ assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "2001:db8:1234::"));
+ assertFalse(DnsNameResolver.shouldUseJndi(
+ enableJndi, enableJndiLocalhost, "[2001:db8:1234::]"));
+ assertFalse(DnsNameResolver.shouldUseJndi(
+ enableJndi, enableJndiLocalhost, "2001:db8:1234::%3"));
+ }
+
+ @Test
+ public void shouldUseJndi_falseForIpv4() {
+ boolean enableJndi = true;
+ boolean enableJndiLocalhost = false;
+
+ assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "127.0.0.1"));
+ assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "192.168.0.1"));
+ assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "134744072"));
+ }
+
+ @Test
+ public void shouldUseJndi_falseForEmpty() {
+ boolean enableJndi = true;
+ boolean enableJndiLocalhost = false;
+
+ assertFalse(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, ""));
+ }
+
+ @Test
+ public void shouldUseJndi_trueIfItMightPossiblyBeValid() {
+ boolean enableJndi = true;
+ boolean enableJndiLocalhost = false;
+
+ assertTrue(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "remotehost"));
+ assertTrue(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "remotehost.gov"));
+ assertTrue(DnsNameResolver.shouldUseJndi(enableJndi, enableJndiLocalhost, "f.q.d.n."));
+ assertTrue(DnsNameResolver.shouldUseJndi(
+ enableJndi, enableJndiLocalhost, "8.8.8.8.in-addr.arpa."));
+ assertTrue(DnsNameResolver.shouldUseJndi(
+ enableJndi, enableJndiLocalhost, "2001-db8-1234--as3.ipv6-literal.net"));
+
+
+
+ }
+
private void testInvalidUri(URI uri) {
try {
provider.newNameResolver(uri, NAME_RESOLVER_PARAMS);
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index ffc92b1bf..c5fdf713e 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -57,7 +57,6 @@ import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.BinaryLog;
import io.grpc.CallCredentials;
-import io.grpc.CallCredentials.MetadataApplier;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@@ -1388,6 +1387,7 @@ public class ManagedChannelImplTest {
* propagated to newStream() and applyRequestMetadata().
*/
@Test
+ @SuppressWarnings("deprecation")
public void informationPropagatedToNewStreamAndCallCredentials() {
createChannel();
CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds);
@@ -1401,9 +1401,9 @@ public class ManagedChannelImplTest {
credsApplyContexts.add(Context.current());
return null;
}
- }).when(creds).applyRequestMetadata(
+ }).when(creds).applyRequestMetadata( // TODO(zhangkun83): remove suppression of deprecations
any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
- any(MetadataApplier.class));
+ any(CallCredentials.MetadataApplier.class));
// First call will be on delayed transport. Only newCall() is run within the expected context,
// so that we can verify that the context is explicitly attached before calling newStream() and
@@ -1435,7 +1435,7 @@ public class ManagedChannelImplTest {
verify(creds, never()).applyRequestMetadata(
any(MethodDescriptor.class), any(Attributes.class), any(Executor.class),
- any(MetadataApplier.class));
+ any(CallCredentials.MetadataApplier.class));
// applyRequestMetadata() is called after the transport becomes ready.
transportInfo.listener.transportReady();
@@ -1444,7 +1444,8 @@ public class ManagedChannelImplTest {
helper.updateBalancingState(READY, mockPicker);
executor.runDueTasks();
ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(Attributes.class);
- ArgumentCaptor<MetadataApplier> applierCaptor = ArgumentCaptor.forClass(MetadataApplier.class);
+ ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor
+ = ArgumentCaptor.forClass(CallCredentials.MetadataApplier.class);
verify(creds).applyRequestMetadata(same(method), attrsCaptor.capture(),
same(executor.getScheduledExecutorService()), applierCaptor.capture());
assertEquals("testValue", testKey.get(credsApplyContexts.poll()));
@@ -2761,7 +2762,7 @@ public class ManagedChannelImplTest {
ManagedChannel mychannel = new CustomBuilder()
.nameResolverFactory(factory)
- .loadBalancerFactory(new AutoConfiguredLoadBalancerFactory()).build();
+ .loadBalancerFactory(new AutoConfiguredLoadBalancerFactory(null, null)).build();
ClientCall<Void, Void> call1 =
mychannel.newCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT);
@@ -2801,6 +2802,14 @@ public class ManagedChannelImplTest {
mychannel.shutdownNow();
}
+ @Test
+ public void getAuthorityAfterShutdown() throws Exception {
+ createChannel();
+ assertEquals(SERVICE_NAME, channel.authority());
+ channel.shutdownNow().awaitTermination(1, TimeUnit.SECONDS);
+ assertEquals(SERVICE_NAME, channel.authority());
+ }
+
private static final class ChannelBuilder
extends AbstractManagedChannelImplBuilder<ChannelBuilder> {
diff --git a/core/src/test/java/io/grpc/util/ForwardingLoadBalancerHelperTest.java b/core/src/test/java/io/grpc/util/ForwardingLoadBalancerHelperTest.java
new file mode 100644
index 000000000..ebc883b9a
--- /dev/null
+++ b/core/src/test/java/io/grpc/util/ForwardingLoadBalancerHelperTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import static org.mockito.Mockito.mock;
+
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.ForwardingTestUtil;
+import io.grpc.LoadBalancer;
+import java.lang.reflect.Method;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link ForwardingLoadBalancerHelper}. */
+@RunWith(JUnit4.class)
+public class ForwardingLoadBalancerHelperTest {
+ private final LoadBalancer.Helper mockDelegate = mock(LoadBalancer.Helper.class);
+
+ private final class TestHelper extends ForwardingLoadBalancerHelper {
+ @Override
+ protected LoadBalancer.Helper delegate() {
+ return mockDelegate;
+ }
+ }
+
+ @Test
+ public void allMethodsForwarded() throws Exception {
+ final SocketAddress mockAddr = mock(SocketAddress.class);
+ ForwardingTestUtil.testMethodsForwarded(
+ LoadBalancer.Helper.class,
+ mockDelegate,
+ new TestHelper(),
+ Collections.<Method>emptyList(),
+ new ForwardingTestUtil.ArgumentProvider() {
+ @Override
+ public Object get(Class<?> clazz) {
+ if (clazz.equals(EquivalentAddressGroup.class)) {
+ return new EquivalentAddressGroup(Arrays.asList(mockAddr));
+ } else if (clazz.equals(List.class)) {
+ return Collections.<Object>emptyList();
+ }
+ return null;
+ }
+ });
+ }
+}