aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorZHANG Dapeng <zdapeng@google.com>2018-08-23 14:38:10 -0700
committerGitHub <noreply@github.com>2018-08-23 14:38:10 -0700
commite9b656845045fd596c2b7a2c0425b9b80ef40848 (patch)
tree335ed815f02bd1d456d1d073c52c1b45f2c8d5a8 /core
parent3f05a6e33118ce7ee142c9769b43fa1524c01d60 (diff)
downloadgrpc-grpc-java-e9b656845045fd596c2b7a2c0425b9b80ef40848.tar.gz
core: plumb hedging policy
This is only API plumbing for hedging, following exactly the same way as its retry counterpart, so it is almost trivial.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/io/grpc/ManagedChannelBuilder.java8
-rw-r--r--core/src/main/java/io/grpc/internal/HedgingPolicy.java88
-rw-r--r--core/src/main/java/io/grpc/internal/ManagedChannelImpl.java7
-rw-r--r--core/src/main/java/io/grpc/internal/RetriableStream.java5
-rw-r--r--core/src/main/java/io/grpc/internal/RetryPolicy.java6
-rw-r--r--core/src/main/java/io/grpc/internal/ServiceConfigInterceptor.java110
-rw-r--r--core/src/main/java/io/grpc/internal/ServiceConfigUtil.java41
-rw-r--r--core/src/test/java/io/grpc/internal/HedgingPolicyTest.java153
-rw-r--r--core/src/test/java/io/grpc/internal/RetriableStreamTest.java9
-rw-r--r--core/src/test/java/io/grpc/internal/RetryPolicyTest.java6
-rw-r--r--core/src/test/java/io/grpc/internal/ServiceConfigInterceptorTest.java17
-rw-r--r--core/src/test/resources/io/grpc/internal/test_hedging_service_config.json54
12 files changed, 471 insertions, 33 deletions
diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
index ab825ac76..7fc48c58c 100644
--- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -424,9 +424,9 @@ public abstract class ManagedChannelBuilder<T extends ManagedChannelBuilder<T>>
/**
- * Disables the retry mechanism provided by the gRPC library. This is designed for the case when
- * users have their own retry implementation and want to avoid their own retry taking place
- * simultaneously with the gRPC library layer retry.
+ * Disables the retry and hedging mechanism provided by the gRPC library. This is designed for the
+ * case when users have their own retry implementation and want to avoid their own retry taking
+ * place simultaneously with the gRPC library layer retry.
*
* @return this
* @since 1.11.0
@@ -437,7 +437,7 @@ public abstract class ManagedChannelBuilder<T extends ManagedChannelBuilder<T>>
}
/**
- * Enables the retry mechanism provided by the gRPC library.
+ * Enables the retry and hedging mechanism provided by the gRPC library.
*
* <p>This method may not work as expected for the current release because retry is not fully
* implemented yet.
diff --git a/core/src/main/java/io/grpc/internal/HedgingPolicy.java b/core/src/main/java/io/grpc/internal/HedgingPolicy.java
new file mode 100644
index 000000000..6e2b62110
--- /dev/null
+++ b/core/src/main/java/io/grpc/internal/HedgingPolicy.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.internal;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableSet;
+import io.grpc.Status.Code;
+import java.util.Collections;
+import java.util.Set;
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Hedging policy data object.
+ */
+@Immutable
+class HedgingPolicy {
+ final int maxAttempts;
+ final long hedgingDelayNanos;
+ final Set<Code> nonFatalStatusCodes;
+
+ /** No hedging. */
+ static final HedgingPolicy DEFAULT =
+ new HedgingPolicy(1, 0, Collections.<Code>emptySet());
+
+ /**
+ * The caller is supposed to have validated the arguments and handled throwing exception or
+ * logging warnings already, so we avoid repeating args check here.
+ */
+ HedgingPolicy(int maxAttempts, long hedgingDelayNanos, Set<Code> nonFatalStatusCodes) {
+ this.maxAttempts = maxAttempts;
+ this.hedgingDelayNanos = hedgingDelayNanos;
+ this.nonFatalStatusCodes = ImmutableSet.copyOf(nonFatalStatusCodes);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ HedgingPolicy that = (HedgingPolicy) other;
+ return maxAttempts == that.maxAttempts
+ && hedgingDelayNanos == that.hedgingDelayNanos
+ && Objects.equal(nonFatalStatusCodes, that.nonFatalStatusCodes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(maxAttempts, hedgingDelayNanos, nonFatalStatusCodes);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("maxAttempts", maxAttempts)
+ .add("hedgingDelayNanos", hedgingDelayNanos)
+ .add("nonFatalStatusCodes", nonFatalStatusCodes)
+ .toString();
+ }
+
+ /**
+ * Provides the most suitable hedging policy for a call.
+ */
+ interface Provider {
+
+ /**
+ * This method is used no more than once for each call. Never returns null.
+ */
+ HedgingPolicy get();
+ }
+}
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 52eeef975..020edef0e 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
+import static io.grpc.internal.ServiceConfigInterceptor.HEDGING_POLICY_KEY;
import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY;
import com.google.common.annotations.VisibleForTesting;
@@ -504,7 +505,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
return new RetriableStream<ReqT>(
method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit,
getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(),
- callOptions.getOption(RETRY_POLICY_KEY), throttle) {
+ callOptions.getOption(RETRY_POLICY_KEY), callOptions.getOption(HEDGING_POLICY_KEY),
+ throttle) {
@Override
Status prestart() {
return uncommittedRetriableStreamsRegistry.add(this);
@@ -559,7 +561,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
- serviceConfigInterceptor = new ServiceConfigInterceptor(retryEnabled, builder.maxRetryAttempts);
+ serviceConfigInterceptor = new ServiceConfigInterceptor(
+ retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
Channel channel = new RealChannel();
channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor);
if (builder.binlog != null) {
diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java
index aa3481283..563434bc7 100644
--- a/core/src/main/java/io/grpc/internal/RetriableStream.java
+++ b/core/src/main/java/io/grpc/internal/RetriableStream.java
@@ -65,6 +65,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
// Must not modify it.
private final Metadata headers;
private final RetryPolicy.Provider retryPolicyProvider;
+ private final HedgingPolicy.Provider hedgingPolicyProvider;
private RetryPolicy retryPolicy;
/** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
@@ -97,7 +98,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
MethodDescriptor<ReqT, ?> method, Metadata headers,
ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
- RetryPolicy.Provider retryPolicyProvider, @Nullable Throttle throttle) {
+ RetryPolicy.Provider retryPolicyProvider, HedgingPolicy.Provider hedgingPolicyProvider,
+ @Nullable Throttle throttle) {
this.method = method;
this.channelBufferUsed = channelBufferUsed;
this.perRpcBufferLimit = perRpcBufferLimit;
@@ -106,6 +108,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
this.scheduledExecutorService = scheduledExecutorService;
this.headers = headers;
this.retryPolicyProvider = checkNotNull(retryPolicyProvider, "retryPolicyProvider");
+ this.hedgingPolicyProvider = checkNotNull(hedgingPolicyProvider, "hedgingPolicyProvider");
this.throttle = throttle;
}
diff --git a/core/src/main/java/io/grpc/internal/RetryPolicy.java b/core/src/main/java/io/grpc/internal/RetryPolicy.java
index 2a7099720..928e5dc7f 100644
--- a/core/src/main/java/io/grpc/internal/RetryPolicy.java
+++ b/core/src/main/java/io/grpc/internal/RetryPolicy.java
@@ -93,15 +93,13 @@ final class RetryPolicy {
}
/**
- * Providers the most suitable retry policy for a call when this will have to provide a retry
- * policy.
+ * Provides the most suitable retry policy for a call.
*/
interface Provider {
/**
- * This method is used no more than once for each call.
+ * This method is used no more than once for each call. Never returns null.
*/
- @Nonnull
RetryPolicy get();
}
}
diff --git a/core/src/main/java/io/grpc/internal/ServiceConfigInterceptor.java b/core/src/main/java/io/grpc/internal/ServiceConfigInterceptor.java
index 49a6e1c9f..b3616aad9 100644
--- a/core/src/main/java/io/grpc/internal/ServiceConfigInterceptor.java
+++ b/core/src/main/java/io/grpc/internal/ServiceConfigInterceptor.java
@@ -18,6 +18,7 @@ package io.grpc.internal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Verify.verify;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
@@ -60,14 +61,16 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
private final boolean retryEnabled;
private final int maxRetryAttemptsLimit;
+ private final int maxHedgedAttemptsLimit;
// Setting this to true and observing this equal to true are run in different threads.
private volatile boolean nameResolveComplete;
ServiceConfigInterceptor(
- boolean retryEnabled, int maxRetryAttemptsLimit) {
+ boolean retryEnabled, int maxRetryAttemptsLimit, int maxHedgedAttemptsLimit) {
this.retryEnabled = retryEnabled;
this.maxRetryAttemptsLimit = maxRetryAttemptsLimit;
+ this.maxHedgedAttemptsLimit = maxHedgedAttemptsLimit;
}
void handleUpdate(@Nonnull Map<String, Object> serviceConfig) {
@@ -86,7 +89,8 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
}
for (Map<String, Object> methodConfig : methodConfigs) {
- MethodInfo info = new MethodInfo(methodConfig, retryEnabled, maxRetryAttemptsLimit);
+ MethodInfo info = new MethodInfo(
+ methodConfig, retryEnabled, maxRetryAttemptsLimit, maxHedgedAttemptsLimit);
List<Map<String, Object>> nameList =
ServiceConfigUtil.getNameListFromMethodConfig(methodConfig);
@@ -129,6 +133,7 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
final Integer maxInboundMessageSize;
final Integer maxOutboundMessageSize;
final RetryPolicy retryPolicy;
+ final HedgingPolicy hedgingPolicy;
/**
* Constructor.
@@ -136,7 +141,8 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
* @param retryEnabled when false, the argument maxRetryAttemptsLimit will have no effect.
*/
MethodInfo(
- Map<String, Object> methodConfig, boolean retryEnabled, int maxRetryAttemptsLimit) {
+ Map<String, Object> methodConfig, boolean retryEnabled, int maxRetryAttemptsLimit,
+ int maxHedgedAttemptsLimit) {
timeoutNanos = ServiceConfigUtil.getTimeoutFromMethodConfig(methodConfig);
waitForReady = ServiceConfigUtil.getWaitForReadyFromMethodConfig(methodConfig);
maxInboundMessageSize =
@@ -154,10 +160,15 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
"maxOutboundMessageSize %s exceeds bounds", maxOutboundMessageSize);
}
- Map<String, Object> policy =
+ Map<String, Object> retryPolicyMap =
retryEnabled ? ServiceConfigUtil.getRetryPolicyFromMethodConfig(methodConfig) : null;
- retryPolicy =
- policy == null ? RetryPolicy.DEFAULT : retryPolicy(policy, maxRetryAttemptsLimit);
+ retryPolicy = retryPolicyMap == null
+ ? RetryPolicy.DEFAULT : retryPolicy(retryPolicyMap, maxRetryAttemptsLimit);
+
+ Map<String, Object> hedgingPolicyMap =
+ retryEnabled ? ServiceConfigUtil.getHedgingPolicyFromMethodConfig(methodConfig) : null;
+ hedgingPolicy = hedgingPolicyMap == null
+ ? HedgingPolicy.DEFAULT : hedgingPolicy(hedgingPolicyMap, maxHedgedAttemptsLimit);
}
@Override
@@ -190,6 +201,7 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
.toString();
}
+ @SuppressWarnings("BetaApi") // Verify is stabilized since Guava v24.0
private static RetryPolicy retryPolicy(Map<String, Object> retryPolicy, int maxAttemptsLimit) {
int maxAttempts = checkNotNull(
ServiceConfigUtil.getMaxAttemptsFromRetryPolicy(retryPolicy),
@@ -226,6 +238,7 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
EnumSet<Code> codes = EnumSet.noneOf(Code.class);
// service config doesn't say if duplicates are allowed, so just accept them.
for (String rawCode : rawCodes) {
+ verify(!"OK".equals(rawCode), "rawCode can not be \"OK\"");
codes.add(Code.valueOf(rawCode));
}
Set<Code> retryableStatusCodes = Collections.unmodifiableSet(codes);
@@ -236,9 +249,42 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
}
}
+ @SuppressWarnings("BetaApi") // Verify is stabilized since Guava v24.0
+ private static HedgingPolicy hedgingPolicy(
+ Map<String, Object> hedgingPolicy, int maxAttemptsLimit) {
+ int maxAttempts = checkNotNull(
+ ServiceConfigUtil.getMaxAttemptsFromHedgingPolicy(hedgingPolicy),
+ "maxAttempts cannot be empty");
+ checkArgument(maxAttempts >= 2, "maxAttempts must be greater than 1: %s", maxAttempts);
+ maxAttempts = Math.min(maxAttempts, maxAttemptsLimit);
+
+ long hedgingDelayNanos = checkNotNull(
+ ServiceConfigUtil.getHedgingDelayNanosFromHedgingPolicy(hedgingPolicy),
+ "hedgingDelay cannot be empty");
+ checkArgument(
+ hedgingDelayNanos >= 0, "hedgingDelay must not be negative: %s", hedgingDelayNanos);
+
+ List<String> rawCodes =
+ ServiceConfigUtil.getNonFatalStatusCodesFromHedgingPolicy(hedgingPolicy);
+ checkNotNull(rawCodes, "rawCodes must be present");
+ checkArgument(!rawCodes.isEmpty(), "rawCodes can't be empty");
+ EnumSet<Code> codes = EnumSet.noneOf(Code.class);
+ // service config doesn't say if duplicates are allowed, so just accept them.
+ for (String rawCode : rawCodes) {
+ verify(!"OK".equals(rawCode), "rawCode can not be \"OK\"");
+ codes.add(Code.valueOf(rawCode));
+ }
+ Set<Code> nonFatalStatusCodes = Collections.unmodifiableSet(codes);
+
+ return new HedgingPolicy(maxAttempts, hedgingDelayNanos, nonFatalStatusCodes);
+ }
+
static final CallOptions.Key<RetryPolicy.Provider> RETRY_POLICY_KEY =
CallOptions.Key.create("internal-retry-policy");
+ static final CallOptions.Key<HedgingPolicy.Provider> HEDGING_POLICY_KEY =
+ CallOptions.Key.create("internal-hedging-policy");
+ @SuppressWarnings("BetaApi") // Verify is stabilized since Guava v24.0
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
@@ -252,7 +298,21 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
}
}
- callOptions = callOptions.withOption(RETRY_POLICY_KEY, new ImmediateRetryPolicyProvider());
+ final HedgingPolicy hedgingPolicy = getHedgingPolicyFromConfig(method);
+ final class ImmediateHedgingPolicyProvider implements HedgingPolicy.Provider {
+ @Override
+ public HedgingPolicy get() {
+ return hedgingPolicy;
+ }
+ }
+
+ verify(
+ retryPolicy.equals(RetryPolicy.DEFAULT) || hedgingPolicy.equals(HedgingPolicy.DEFAULT),
+ "Can not apply both retry and hedging policy for the method '%s'", method);
+
+ callOptions = callOptions
+ .withOption(RETRY_POLICY_KEY, new ImmediateRetryPolicyProvider())
+ .withOption(HEDGING_POLICY_KEY, new ImmediateHedgingPolicyProvider());
} else {
final class DelayedRetryPolicyProvider implements RetryPolicy.Provider {
/**
@@ -270,7 +330,30 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
}
}
- callOptions = callOptions.withOption(RETRY_POLICY_KEY, new DelayedRetryPolicyProvider());
+ final class DelayedHedgingPolicyProvider implements HedgingPolicy.Provider {
+ /**
+ * Returns HedgingPolicy.DEFAULT if name resolving is not complete at the moment the
+ * method is invoked, otherwise returns the HedgingPolicy computed from service config.
+ *
+ * <p>Note that this method is used no more than once for each call.
+ */
+ @Override
+ public HedgingPolicy get() {
+ if (!nameResolveComplete) {
+ return HedgingPolicy.DEFAULT;
+ }
+ HedgingPolicy hedgingPolicy = getHedgingPolicyFromConfig(method);
+ verify(
+ hedgingPolicy.equals(HedgingPolicy.DEFAULT)
+ || getRetryPolicyFromConfig(method).equals(RetryPolicy.DEFAULT),
+ "Can not apply both retry and hedging policy for the method '%s'", method);
+ return hedgingPolicy;
+ }
+ }
+
+ callOptions = callOptions
+ .withOption(RETRY_POLICY_KEY, new DelayedRetryPolicyProvider())
+ .withOption(HEDGING_POLICY_KEY, new DelayedHedgingPolicyProvider());
}
}
@@ -333,9 +416,12 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
@VisibleForTesting
RetryPolicy getRetryPolicyFromConfig(MethodDescriptor<?, ?> method) {
MethodInfo info = getMethodInfo(method);
- if (info == null || info.retryPolicy == null) {
- return RetryPolicy.DEFAULT;
- }
- return info.retryPolicy;
+ return info == null ? RetryPolicy.DEFAULT : info.retryPolicy;
+ }
+
+ @VisibleForTesting
+ HedgingPolicy getHedgingPolicyFromConfig(MethodDescriptor<?, ?> method) {
+ MethodInfo info = getMethodInfo(method);
+ return info == null ? HedgingPolicy.DEFAULT : info.hedgingPolicy;
}
}
diff --git a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java
index ad0e36814..5295327b4 100644
--- a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java
+++ b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java
@@ -45,6 +45,7 @@ public final class ServiceConfigUtil {
private static final String METHOD_CONFIG_MAX_RESPONSE_MESSAGE_BYTES_KEY =
"maxResponseMessageBytes";
private static final String METHOD_CONFIG_RETRY_POLICY_KEY = "retryPolicy";
+ private static final String METHOD_CONFIG_HEDGING_POLICY_KEY = "hedgingPolicy";
private static final String NAME_SERVICE_KEY = "service";
private static final String NAME_METHOD_KEY = "method";
private static final String RETRY_POLICY_MAX_ATTEMPTS_KEY = "maxAttempts";
@@ -52,6 +53,9 @@ public final class ServiceConfigUtil {
private static final String RETRY_POLICY_MAX_BACKOFF_KEY = "maxBackoff";
private static final String RETRY_POLICY_BACKOFF_MULTIPLIER_KEY = "backoffMultiplier";
private static final String RETRY_POLICY_RETRYABLE_STATUS_CODES_KEY = "retryableStatusCodes";
+ private static final String HEDGING_POLICY_MAX_ATTEMPTS_KEY = "maxAttempts";
+ private static final String HEDGING_POLICY_HEDGING_DELAY_KEY = "hedgingDelay";
+ private static final String HEDGING_POLICY_NON_FATAL_STATUS_CODES_KEY = "nonFatalStatusCodes";
private static final long DURATION_SECONDS_MIN = -315576000000L;
private static final long DURATION_SECONDS_MAX = 315576000000L;
@@ -144,6 +148,35 @@ public final class ServiceConfigUtil {
}
@Nullable
+ static Integer getMaxAttemptsFromHedgingPolicy(Map<String, Object> hedgingPolicy) {
+ if (!hedgingPolicy.containsKey(HEDGING_POLICY_MAX_ATTEMPTS_KEY)) {
+ return null;
+ }
+ return getDouble(hedgingPolicy, RETRY_POLICY_MAX_ATTEMPTS_KEY).intValue();
+ }
+
+ @Nullable
+ static Long getHedgingDelayNanosFromHedgingPolicy(Map<String, Object> hedgingPolicy) {
+ if (!hedgingPolicy.containsKey(HEDGING_POLICY_HEDGING_DELAY_KEY)) {
+ return null;
+ }
+ String rawHedgingDelay = getString(hedgingPolicy, HEDGING_POLICY_HEDGING_DELAY_KEY);
+ try {
+ return parseDuration(rawHedgingDelay);
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Nullable
+ static List<String> getNonFatalStatusCodesFromHedgingPolicy(Map<String, Object> hedgingPolicy) {
+ if (!hedgingPolicy.containsKey(HEDGING_POLICY_NON_FATAL_STATUS_CODES_KEY)) {
+ return null;
+ }
+ return checkStringList(getList(hedgingPolicy, HEDGING_POLICY_NON_FATAL_STATUS_CODES_KEY));
+ }
+
+ @Nullable
static String getServiceFromName(Map<String, Object> name) {
if (!name.containsKey(NAME_SERVICE_KEY)) {
return null;
@@ -168,6 +201,14 @@ public final class ServiceConfigUtil {
}
@Nullable
+ static Map<String, Object> getHedgingPolicyFromMethodConfig(Map<String, Object> methodConfig) {
+ if (!methodConfig.containsKey(METHOD_CONFIG_HEDGING_POLICY_KEY)) {
+ return null;
+ }
+ return getObject(methodConfig, METHOD_CONFIG_HEDGING_POLICY_KEY);
+ }
+
+ @Nullable
static List<Map<String, Object>> getNameListFromMethodConfig(Map<String, Object> methodConfig) {
if (!methodConfig.containsKey(METHOD_CONFIG_NAME_KEY)) {
return null;
diff --git a/core/src/test/java/io/grpc/internal/HedgingPolicyTest.java b/core/src/test/java/io/grpc/internal/HedgingPolicyTest.java
new file mode 100644
index 000000000..37e82af46
--- /dev/null
+++ b/core/src/test/java/io/grpc/internal/HedgingPolicyTest.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.internal;
+
+import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.internal.ServiceConfigInterceptor.HEDGING_POLICY_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableSet;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status.Code;
+import io.grpc.testing.TestMethodDescriptors;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+
+/** Unit tests for HedgingPolicy. */
+@RunWith(JUnit4.class)
+public class HedgingPolicyTest {
+ @Test
+ public void getHedgingPolicies() throws Exception {
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(RetryPolicyTest.class.getResourceAsStream(
+ "/io/grpc/internal/test_hedging_service_config.json"), "UTF-8"));
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line).append('\n');
+ }
+ Object serviceConfigObj = JsonParser.parse(sb.toString());
+ assertTrue(serviceConfigObj instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> serviceConfig = (Map<String, Object>) serviceConfigObj;
+
+ ServiceConfigInterceptor serviceConfigInterceptor = new ServiceConfigInterceptor(
+ /* retryEnabled = */ true, /* maxRetryAttemptsLimit = */ 3,
+ /* maxHedgedAttemptsLimit = */ 4);
+ serviceConfigInterceptor.handleUpdate(serviceConfig);
+
+ MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod().toBuilder();
+
+ MethodDescriptor<Void, Void> method = builder.setFullMethodName("not/exist").build();
+ assertEquals(
+ HedgingPolicy.DEFAULT,
+ serviceConfigInterceptor.getHedgingPolicyFromConfig(method));
+
+ method = builder.setFullMethodName("not_exist/Foo1").build();
+ assertEquals(
+ HedgingPolicy.DEFAULT,
+ serviceConfigInterceptor.getHedgingPolicyFromConfig(method));
+
+ method = builder.setFullMethodName("SimpleService1/not_exist").build();
+
+ assertEquals(
+ new HedgingPolicy(
+ 3,
+ TimeUnit.MILLISECONDS.toNanos(2100),
+ ImmutableSet.of(Code.UNAVAILABLE, Code.RESOURCE_EXHAUSTED)),
+ serviceConfigInterceptor.getHedgingPolicyFromConfig(method));
+
+ method = builder.setFullMethodName("SimpleService1/Foo1").build();
+ assertEquals(
+ new HedgingPolicy(
+ 4,
+ TimeUnit.MILLISECONDS.toNanos(100),
+ ImmutableSet.of(Code.UNAVAILABLE)),
+ serviceConfigInterceptor.getHedgingPolicyFromConfig(method));
+
+ method = builder.setFullMethodName("SimpleService2/not_exist").build();
+ assertEquals(
+ HedgingPolicy.DEFAULT,
+ serviceConfigInterceptor.getHedgingPolicyFromConfig(method));
+
+ method = builder.setFullMethodName("SimpleService2/Foo2").build();
+ assertEquals(
+ new HedgingPolicy(
+ 4,
+ TimeUnit.MILLISECONDS.toNanos(100),
+ ImmutableSet.of(Code.UNAVAILABLE)),
+ serviceConfigInterceptor.getHedgingPolicyFromConfig(method));
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ @Test
+ public void getRetryPolicies_hedgingDisabled() throws Exception {
+ Channel channel = mock(Channel.class);
+ ArgumentCaptor<CallOptions> callOptionsCap = ArgumentCaptor.forClass(CallOptions.class);
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(RetryPolicyTest.class.getResourceAsStream(
+ "/io/grpc/internal/test_hedging_service_config.json"), "UTF-8"));
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line).append('\n');
+ }
+ Object serviceConfigObj = JsonParser.parse(sb.toString());
+ assertTrue(serviceConfigObj instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> serviceConfig = (Map<String, Object>) serviceConfigObj;
+
+ ServiceConfigInterceptor serviceConfigInterceptor = new ServiceConfigInterceptor(
+ /* retryEnabled = */ false, /* maxRetryAttemptsLimit = */ 3,
+ /* maxHedgedAttemptsLimit = */ 4);
+ serviceConfigInterceptor.handleUpdate(serviceConfig);
+
+ MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod().toBuilder();
+
+ MethodDescriptor<Void, Void> method =
+ builder.setFullMethodName("SimpleService1/Foo1").build();
+
+ serviceConfigInterceptor.interceptCall(method, CallOptions.DEFAULT, channel);
+ verify(channel).newCall(eq(method), callOptionsCap.capture());
+ assertThat(callOptionsCap.getValue().getOption(HEDGING_POLICY_KEY)).isNull();
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+}
diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java
index 7f219376a..289430f5a 100644
--- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java
@@ -131,6 +131,7 @@ public class RetriableStreamTest {
Executor callExecutor,
ScheduledExecutorService scheduledExecutorService,
final RetryPolicy retryPolicy,
+ final HedgingPolicy hedgingPolicy,
@Nullable Throttle throttle) {
super(
method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, callExecutor,
@@ -141,6 +142,12 @@ public class RetriableStreamTest {
return retryPolicy;
}
},
+ new HedgingPolicy.Provider() {
+ @Override
+ public HedgingPolicy get() {
+ return hedgingPolicy;
+ }
+ },
throttle);
}
@@ -173,7 +180,7 @@ public class RetriableStreamTest {
return new RecordedRetriableStream(
method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), RETRY_POLICY,
- throttle);
+ HedgingPolicy.DEFAULT, throttle);
}
@After
diff --git a/core/src/test/java/io/grpc/internal/RetryPolicyTest.java b/core/src/test/java/io/grpc/internal/RetryPolicyTest.java
index c62eb1bcb..e9296b96b 100644
--- a/core/src/test/java/io/grpc/internal/RetryPolicyTest.java
+++ b/core/src/test/java/io/grpc/internal/RetryPolicyTest.java
@@ -63,7 +63,8 @@ public class RetryPolicyTest {
Map<String, Object> serviceConfig = (Map<String, Object>) serviceConfigObj;
ServiceConfigInterceptor serviceConfigInterceptor = new ServiceConfigInterceptor(
- true /* retryEnabled */, 4 /* maxRetryAttemptsLimit */);
+ /* retryEnabled = */ true, /* maxRetryAttemptsLimit = */ 4,
+ /* maxHedgedAttemptsLimit = */ 3);
serviceConfigInterceptor.handleUpdate(serviceConfig);
MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod().toBuilder();
@@ -140,7 +141,8 @@ public class RetryPolicyTest {
Map<String, Object> serviceConfig = (Map<String, Object>) serviceConfigObj;
ServiceConfigInterceptor serviceConfigInterceptor = new ServiceConfigInterceptor(
- false /* retryEnabled */, 4 /* maxRetryAttemptsLimit */);
+ /* retryEnabled = */ false, /* maxRetryAttemptsLimit = */ 4,
+ /* maxHedgedAttemptsLimit = */ 3);
serviceConfigInterceptor.handleUpdate(serviceConfig);
MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod().toBuilder();
diff --git a/core/src/test/java/io/grpc/internal/ServiceConfigInterceptorTest.java b/core/src/test/java/io/grpc/internal/ServiceConfigInterceptorTest.java
index dcf1f76c5..4d06df1b8 100644
--- a/core/src/test/java/io/grpc/internal/ServiceConfigInterceptorTest.java
+++ b/core/src/test/java/io/grpc/internal/ServiceConfigInterceptorTest.java
@@ -17,6 +17,7 @@
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.internal.ServiceConfigInterceptor.HEDGING_POLICY_KEY;
import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
@@ -60,7 +61,7 @@ public class ServiceConfigInterceptorTest {
}
private final ServiceConfigInterceptor interceptor = new ServiceConfigInterceptor(
- true /* retryEnabled */, 5 /* maxRetryAttemptsLimit */);
+ /* retryEnabled = */ true, /* maxRetryAttemptsLimit = */ 5, /* maxHedgedAttemptsLimit = */ 6);
private final String fullMethodName =
MethodDescriptor.generateFullMethodName("service", "method");
@@ -108,6 +109,8 @@ public class ServiceConfigInterceptorTest {
assertThat(callOptionsCap.getValue().isWaitForReady()).isFalse();
assertThat(callOptionsCap.getValue().getOption(RETRY_POLICY_KEY).get())
.isEqualTo(RetryPolicy.DEFAULT);
+ assertThat(callOptionsCap.getValue().getOption(HEDGING_POLICY_KEY).get())
+ .isEqualTo(HedgingPolicy.DEFAULT);
}
@Test
@@ -363,9 +366,9 @@ public class ServiceConfigInterceptorTest {
assertThat(interceptor.serviceMethodMap.get())
.containsExactly(
methodDescriptor.getFullMethodName(),
- new MethodInfo(methodConfig, false, 1));
+ new MethodInfo(methodConfig, false, 1, 1));
assertThat(interceptor.serviceMap.get()).containsExactly(
- "service2", new MethodInfo(methodConfig, false, 1));
+ "service2", new MethodInfo(methodConfig, false, 1, 1));
}
@@ -376,7 +379,7 @@ public class ServiceConfigInterceptorTest {
thrown.expectMessage("Duration value is out of range");
- new MethodInfo(methodConfig, false, 1);
+ new MethodInfo(methodConfig, false, 1, 1);
}
@Test
@@ -384,7 +387,7 @@ public class ServiceConfigInterceptorTest {
JsonObj name = new JsonObj("service", "service");
JsonObj methodConfig = new JsonObj("name", new JsonList(name), "timeout", "315576000000s");
- MethodInfo info = new MethodInfo(methodConfig, false, 1);
+ MethodInfo info = new MethodInfo(methodConfig, false, 1, 1);
assertThat(info.timeoutNanos).isEqualTo(Long.MAX_VALUE);
}
@@ -398,7 +401,7 @@ public class ServiceConfigInterceptorTest {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("exceeds bounds");
- new MethodInfo(methodConfig, false, 1);
+ new MethodInfo(methodConfig, false, 1, 1);
}
@Test
@@ -409,7 +412,7 @@ public class ServiceConfigInterceptorTest {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("exceeds bounds");
- new MethodInfo(methodConfig, false, 1);
+ new MethodInfo(methodConfig, false, 1, 1);
}
private static final class NoopMarshaller implements MethodDescriptor.Marshaller<Void> {
diff --git a/core/src/test/resources/io/grpc/internal/test_hedging_service_config.json b/core/src/test/resources/io/grpc/internal/test_hedging_service_config.json
new file mode 100644
index 000000000..5533973dd
--- /dev/null
+++ b/core/src/test/resources/io/grpc/internal/test_hedging_service_config.json
@@ -0,0 +1,54 @@
+{
+ "loadBalancingPolicy":"round_robin",
+ "methodConfig":[
+ {
+ "name":[
+ {
+ "service":"SimpleService1"
+ }
+ ],
+ "waitForReady":false,
+ "hedgingPolicy":{
+ "maxAttempts":3,
+ "hedgingDelay":"2.1s",
+ "nonFatalStatusCodes":[
+ "UNAVAILABLE",
+ "RESOURCE_EXHAUSTED"
+ ]
+ }
+ },
+ {
+ "name":[
+ {
+ "service":"SimpleService2"
+ }
+ ],
+ "waitForReady":false
+ },
+ {
+ "name":[
+ {
+ "service":"SimpleService1",
+ "method":"Foo1"
+ },
+ {
+ "service":"SimpleService2",
+ "method":"Foo2"
+ }
+ ],
+ "waitForReady":true,
+ "hedgingPolicy":{
+ "maxAttempts":5,
+ "hedgingDelay":"0.1s",
+ "nonFatalStatusCodes":[
+ "UNAVAILABLE"
+ ]
+ }
+ }
+ ],
+
+ "retryThrottling": {
+ "maxTokens": 10,
+ "tokenRatio": 0.1
+ }
+}