diff options
author | ZHANG Dapeng <zdapeng@google.com> | 2018-08-23 14:38:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-23 14:38:10 -0700 |
commit | e9b656845045fd596c2b7a2c0425b9b80ef40848 (patch) | |
tree | 335ed815f02bd1d456d1d073c52c1b45f2c8d5a8 /core | |
parent | 3f05a6e33118ce7ee142c9769b43fa1524c01d60 (diff) | |
download | grpc-grpc-java-e9b656845045fd596c2b7a2c0425b9b80ef40848.tar.gz |
core: plumb hedging policy
This is only API plumbing for hedging, following exactly the same way as its retry counterpart, so it is almost trivial.
Diffstat (limited to 'core')
12 files changed, 471 insertions, 33 deletions
diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java index ab825ac76..7fc48c58c 100644 --- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java +++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java @@ -424,9 +424,9 @@ public abstract class ManagedChannelBuilder<T extends ManagedChannelBuilder<T>> /** - * Disables the retry mechanism provided by the gRPC library. This is designed for the case when - * users have their own retry implementation and want to avoid their own retry taking place - * simultaneously with the gRPC library layer retry. + * Disables the retry and hedging mechanism provided by the gRPC library. This is designed for the + * case when users have their own retry implementation and want to avoid their own retry taking + * place simultaneously with the gRPC library layer retry. * * @return this * @since 1.11.0 @@ -437,7 +437,7 @@ public abstract class ManagedChannelBuilder<T extends ManagedChannelBuilder<T>> } /** - * Enables the retry mechanism provided by the gRPC library. + * Enables the retry and hedging mechanism provided by the gRPC library. * * <p>This method may not work as expected for the current release because retry is not fully * implemented yet. diff --git a/core/src/main/java/io/grpc/internal/HedgingPolicy.java b/core/src/main/java/io/grpc/internal/HedgingPolicy.java new file mode 100644 index 000000000..6e2b62110 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/HedgingPolicy.java @@ -0,0 +1,88 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableSet; +import io.grpc.Status.Code; +import java.util.Collections; +import java.util.Set; +import javax.annotation.concurrent.Immutable; + +/** + * Hedging policy data object. + */ +@Immutable +class HedgingPolicy { + final int maxAttempts; + final long hedgingDelayNanos; + final Set<Code> nonFatalStatusCodes; + + /** No hedging. */ + static final HedgingPolicy DEFAULT = + new HedgingPolicy(1, 0, Collections.<Code>emptySet()); + + /** + * The caller is supposed to have validated the arguments and handled throwing exception or + * logging warnings already, so we avoid repeating args check here. + */ + HedgingPolicy(int maxAttempts, long hedgingDelayNanos, Set<Code> nonFatalStatusCodes) { + this.maxAttempts = maxAttempts; + this.hedgingDelayNanos = hedgingDelayNanos; + this.nonFatalStatusCodes = ImmutableSet.copyOf(nonFatalStatusCodes); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + HedgingPolicy that = (HedgingPolicy) other; + return maxAttempts == that.maxAttempts + && hedgingDelayNanos == that.hedgingDelayNanos + && Objects.equal(nonFatalStatusCodes, that.nonFatalStatusCodes); + } + + @Override + public int hashCode() { + return Objects.hashCode(maxAttempts, hedgingDelayNanos, nonFatalStatusCodes); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("maxAttempts", maxAttempts) + .add("hedgingDelayNanos", hedgingDelayNanos) + .add("nonFatalStatusCodes", nonFatalStatusCodes) + .toString(); + } + + /** + * Provides the most suitable hedging policy for a call. + */ + interface Provider { + + /** + * This method is used no more than once for each call. Never returns null. + */ + HedgingPolicy get(); + } +} diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 52eeef975..020edef0e 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.internal.ServiceConfigInterceptor.HEDGING_POLICY_KEY; import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY; import com.google.common.annotations.VisibleForTesting; @@ -504,7 +505,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch return new RetriableStream<ReqT>( method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(), - callOptions.getOption(RETRY_POLICY_KEY), throttle) { + callOptions.getOption(RETRY_POLICY_KEY), callOptions.getOption(HEDGING_POLICY_KEY), + throttle) { @Override Status prestart() { return uncommittedRetriableStreamsRegistry.add(this); @@ -559,7 +561,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch this.transportFactory = new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor); this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry; - serviceConfigInterceptor = new ServiceConfigInterceptor(retryEnabled, builder.maxRetryAttempts); + serviceConfigInterceptor = new ServiceConfigInterceptor( + retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts); Channel channel = new RealChannel(); channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor); if (builder.binlog != null) { diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index aa3481283..563434bc7 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -65,6 +65,7 @@ abstract class RetriableStream<ReqT> implements ClientStream { // Must not modify it. private final Metadata headers; private final RetryPolicy.Provider retryPolicyProvider; + private final HedgingPolicy.Provider hedgingPolicyProvider; private RetryPolicy retryPolicy; /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */ @@ -97,7 +98,8 @@ abstract class RetriableStream<ReqT> implements ClientStream { MethodDescriptor<ReqT, ?> method, Metadata headers, ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, Executor callExecutor, ScheduledExecutorService scheduledExecutorService, - RetryPolicy.Provider retryPolicyProvider, @Nullable Throttle throttle) { + RetryPolicy.Provider retryPolicyProvider, HedgingPolicy.Provider hedgingPolicyProvider, + @Nullable Throttle throttle) { this.method = method; this.channelBufferUsed = channelBufferUsed; this.perRpcBufferLimit = perRpcBufferLimit; @@ -106,6 +108,7 @@ abstract class RetriableStream<ReqT> implements ClientStream { this.scheduledExecutorService = scheduledExecutorService; this.headers = headers; this.retryPolicyProvider = checkNotNull(retryPolicyProvider, "retryPolicyProvider"); + this.hedgingPolicyProvider = checkNotNull(hedgingPolicyProvider, "hedgingPolicyProvider"); this.throttle = throttle; } diff --git a/core/src/main/java/io/grpc/internal/RetryPolicy.java b/core/src/main/java/io/grpc/internal/RetryPolicy.java index 2a7099720..928e5dc7f 100644 --- a/core/src/main/java/io/grpc/internal/RetryPolicy.java +++ b/core/src/main/java/io/grpc/internal/RetryPolicy.java @@ -93,15 +93,13 @@ final class RetryPolicy { } /** - * Providers the most suitable retry policy for a call when this will have to provide a retry - * policy. + * Provides the most suitable retry policy for a call. */ interface Provider { /** - * This method is used no more than once for each call. + * This method is used no more than once for each call. Never returns null. */ - @Nonnull RetryPolicy get(); } } diff --git a/core/src/main/java/io/grpc/internal/ServiceConfigInterceptor.java b/core/src/main/java/io/grpc/internal/ServiceConfigInterceptor.java index 49a6e1c9f..b3616aad9 100644 --- a/core/src/main/java/io/grpc/internal/ServiceConfigInterceptor.java +++ b/core/src/main/java/io/grpc/internal/ServiceConfigInterceptor.java @@ -18,6 +18,7 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Verify.verify; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -60,14 +61,16 @@ final class ServiceConfigInterceptor implements ClientInterceptor { private final boolean retryEnabled; private final int maxRetryAttemptsLimit; + private final int maxHedgedAttemptsLimit; // Setting this to true and observing this equal to true are run in different threads. private volatile boolean nameResolveComplete; ServiceConfigInterceptor( - boolean retryEnabled, int maxRetryAttemptsLimit) { + boolean retryEnabled, int maxRetryAttemptsLimit, int maxHedgedAttemptsLimit) { this.retryEnabled = retryEnabled; this.maxRetryAttemptsLimit = maxRetryAttemptsLimit; + this.maxHedgedAttemptsLimit = maxHedgedAttemptsLimit; } void handleUpdate(@Nonnull Map<String, Object> serviceConfig) { @@ -86,7 +89,8 @@ final class ServiceConfigInterceptor implements ClientInterceptor { } for (Map<String, Object> methodConfig : methodConfigs) { - MethodInfo info = new MethodInfo(methodConfig, retryEnabled, maxRetryAttemptsLimit); + MethodInfo info = new MethodInfo( + methodConfig, retryEnabled, maxRetryAttemptsLimit, maxHedgedAttemptsLimit); List<Map<String, Object>> nameList = ServiceConfigUtil.getNameListFromMethodConfig(methodConfig); @@ -129,6 +133,7 @@ final class ServiceConfigInterceptor implements ClientInterceptor { final Integer maxInboundMessageSize; final Integer maxOutboundMessageSize; final RetryPolicy retryPolicy; + final HedgingPolicy hedgingPolicy; /** * Constructor. @@ -136,7 +141,8 @@ final class ServiceConfigInterceptor implements ClientInterceptor { * @param retryEnabled when false, the argument maxRetryAttemptsLimit will have no effect. */ MethodInfo( - Map<String, Object> methodConfig, boolean retryEnabled, int maxRetryAttemptsLimit) { + Map<String, Object> methodConfig, boolean retryEnabled, int maxRetryAttemptsLimit, + int maxHedgedAttemptsLimit) { timeoutNanos = ServiceConfigUtil.getTimeoutFromMethodConfig(methodConfig); waitForReady = ServiceConfigUtil.getWaitForReadyFromMethodConfig(methodConfig); maxInboundMessageSize = @@ -154,10 +160,15 @@ final class ServiceConfigInterceptor implements ClientInterceptor { "maxOutboundMessageSize %s exceeds bounds", maxOutboundMessageSize); } - Map<String, Object> policy = + Map<String, Object> retryPolicyMap = retryEnabled ? ServiceConfigUtil.getRetryPolicyFromMethodConfig(methodConfig) : null; - retryPolicy = - policy == null ? RetryPolicy.DEFAULT : retryPolicy(policy, maxRetryAttemptsLimit); + retryPolicy = retryPolicyMap == null + ? RetryPolicy.DEFAULT : retryPolicy(retryPolicyMap, maxRetryAttemptsLimit); + + Map<String, Object> hedgingPolicyMap = + retryEnabled ? ServiceConfigUtil.getHedgingPolicyFromMethodConfig(methodConfig) : null; + hedgingPolicy = hedgingPolicyMap == null + ? HedgingPolicy.DEFAULT : hedgingPolicy(hedgingPolicyMap, maxHedgedAttemptsLimit); } @Override @@ -190,6 +201,7 @@ final class ServiceConfigInterceptor implements ClientInterceptor { .toString(); } + @SuppressWarnings("BetaApi") // Verify is stabilized since Guava v24.0 private static RetryPolicy retryPolicy(Map<String, Object> retryPolicy, int maxAttemptsLimit) { int maxAttempts = checkNotNull( ServiceConfigUtil.getMaxAttemptsFromRetryPolicy(retryPolicy), @@ -226,6 +238,7 @@ final class ServiceConfigInterceptor implements ClientInterceptor { EnumSet<Code> codes = EnumSet.noneOf(Code.class); // service config doesn't say if duplicates are allowed, so just accept them. for (String rawCode : rawCodes) { + verify(!"OK".equals(rawCode), "rawCode can not be \"OK\""); codes.add(Code.valueOf(rawCode)); } Set<Code> retryableStatusCodes = Collections.unmodifiableSet(codes); @@ -236,9 +249,42 @@ final class ServiceConfigInterceptor implements ClientInterceptor { } } + @SuppressWarnings("BetaApi") // Verify is stabilized since Guava v24.0 + private static HedgingPolicy hedgingPolicy( + Map<String, Object> hedgingPolicy, int maxAttemptsLimit) { + int maxAttempts = checkNotNull( + ServiceConfigUtil.getMaxAttemptsFromHedgingPolicy(hedgingPolicy), + "maxAttempts cannot be empty"); + checkArgument(maxAttempts >= 2, "maxAttempts must be greater than 1: %s", maxAttempts); + maxAttempts = Math.min(maxAttempts, maxAttemptsLimit); + + long hedgingDelayNanos = checkNotNull( + ServiceConfigUtil.getHedgingDelayNanosFromHedgingPolicy(hedgingPolicy), + "hedgingDelay cannot be empty"); + checkArgument( + hedgingDelayNanos >= 0, "hedgingDelay must not be negative: %s", hedgingDelayNanos); + + List<String> rawCodes = + ServiceConfigUtil.getNonFatalStatusCodesFromHedgingPolicy(hedgingPolicy); + checkNotNull(rawCodes, "rawCodes must be present"); + checkArgument(!rawCodes.isEmpty(), "rawCodes can't be empty"); + EnumSet<Code> codes = EnumSet.noneOf(Code.class); + // service config doesn't say if duplicates are allowed, so just accept them. + for (String rawCode : rawCodes) { + verify(!"OK".equals(rawCode), "rawCode can not be \"OK\""); + codes.add(Code.valueOf(rawCode)); + } + Set<Code> nonFatalStatusCodes = Collections.unmodifiableSet(codes); + + return new HedgingPolicy(maxAttempts, hedgingDelayNanos, nonFatalStatusCodes); + } + static final CallOptions.Key<RetryPolicy.Provider> RETRY_POLICY_KEY = CallOptions.Key.create("internal-retry-policy"); + static final CallOptions.Key<HedgingPolicy.Provider> HEDGING_POLICY_KEY = + CallOptions.Key.create("internal-hedging-policy"); + @SuppressWarnings("BetaApi") // Verify is stabilized since Guava v24.0 @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { @@ -252,7 +298,21 @@ final class ServiceConfigInterceptor implements ClientInterceptor { } } - callOptions = callOptions.withOption(RETRY_POLICY_KEY, new ImmediateRetryPolicyProvider()); + final HedgingPolicy hedgingPolicy = getHedgingPolicyFromConfig(method); + final class ImmediateHedgingPolicyProvider implements HedgingPolicy.Provider { + @Override + public HedgingPolicy get() { + return hedgingPolicy; + } + } + + verify( + retryPolicy.equals(RetryPolicy.DEFAULT) || hedgingPolicy.equals(HedgingPolicy.DEFAULT), + "Can not apply both retry and hedging policy for the method '%s'", method); + + callOptions = callOptions + .withOption(RETRY_POLICY_KEY, new ImmediateRetryPolicyProvider()) + .withOption(HEDGING_POLICY_KEY, new ImmediateHedgingPolicyProvider()); } else { final class DelayedRetryPolicyProvider implements RetryPolicy.Provider { /** @@ -270,7 +330,30 @@ final class ServiceConfigInterceptor implements ClientInterceptor { } } - callOptions = callOptions.withOption(RETRY_POLICY_KEY, new DelayedRetryPolicyProvider()); + final class DelayedHedgingPolicyProvider implements HedgingPolicy.Provider { + /** + * Returns HedgingPolicy.DEFAULT if name resolving is not complete at the moment the + * method is invoked, otherwise returns the HedgingPolicy computed from service config. + * + * <p>Note that this method is used no more than once for each call. + */ + @Override + public HedgingPolicy get() { + if (!nameResolveComplete) { + return HedgingPolicy.DEFAULT; + } + HedgingPolicy hedgingPolicy = getHedgingPolicyFromConfig(method); + verify( + hedgingPolicy.equals(HedgingPolicy.DEFAULT) + || getRetryPolicyFromConfig(method).equals(RetryPolicy.DEFAULT), + "Can not apply both retry and hedging policy for the method '%s'", method); + return hedgingPolicy; + } + } + + callOptions = callOptions + .withOption(RETRY_POLICY_KEY, new DelayedRetryPolicyProvider()) + .withOption(HEDGING_POLICY_KEY, new DelayedHedgingPolicyProvider()); } } @@ -333,9 +416,12 @@ final class ServiceConfigInterceptor implements ClientInterceptor { @VisibleForTesting RetryPolicy getRetryPolicyFromConfig(MethodDescriptor<?, ?> method) { MethodInfo info = getMethodInfo(method); - if (info == null || info.retryPolicy == null) { - return RetryPolicy.DEFAULT; - } - return info.retryPolicy; + return info == null ? RetryPolicy.DEFAULT : info.retryPolicy; + } + + @VisibleForTesting + HedgingPolicy getHedgingPolicyFromConfig(MethodDescriptor<?, ?> method) { + MethodInfo info = getMethodInfo(method); + return info == null ? HedgingPolicy.DEFAULT : info.hedgingPolicy; } } diff --git a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java index ad0e36814..5295327b4 100644 --- a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java +++ b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java @@ -45,6 +45,7 @@ public final class ServiceConfigUtil { private static final String METHOD_CONFIG_MAX_RESPONSE_MESSAGE_BYTES_KEY = "maxResponseMessageBytes"; private static final String METHOD_CONFIG_RETRY_POLICY_KEY = "retryPolicy"; + private static final String METHOD_CONFIG_HEDGING_POLICY_KEY = "hedgingPolicy"; private static final String NAME_SERVICE_KEY = "service"; private static final String NAME_METHOD_KEY = "method"; private static final String RETRY_POLICY_MAX_ATTEMPTS_KEY = "maxAttempts"; @@ -52,6 +53,9 @@ public final class ServiceConfigUtil { private static final String RETRY_POLICY_MAX_BACKOFF_KEY = "maxBackoff"; private static final String RETRY_POLICY_BACKOFF_MULTIPLIER_KEY = "backoffMultiplier"; private static final String RETRY_POLICY_RETRYABLE_STATUS_CODES_KEY = "retryableStatusCodes"; + private static final String HEDGING_POLICY_MAX_ATTEMPTS_KEY = "maxAttempts"; + private static final String HEDGING_POLICY_HEDGING_DELAY_KEY = "hedgingDelay"; + private static final String HEDGING_POLICY_NON_FATAL_STATUS_CODES_KEY = "nonFatalStatusCodes"; private static final long DURATION_SECONDS_MIN = -315576000000L; private static final long DURATION_SECONDS_MAX = 315576000000L; @@ -144,6 +148,35 @@ public final class ServiceConfigUtil { } @Nullable + static Integer getMaxAttemptsFromHedgingPolicy(Map<String, Object> hedgingPolicy) { + if (!hedgingPolicy.containsKey(HEDGING_POLICY_MAX_ATTEMPTS_KEY)) { + return null; + } + return getDouble(hedgingPolicy, RETRY_POLICY_MAX_ATTEMPTS_KEY).intValue(); + } + + @Nullable + static Long getHedgingDelayNanosFromHedgingPolicy(Map<String, Object> hedgingPolicy) { + if (!hedgingPolicy.containsKey(HEDGING_POLICY_HEDGING_DELAY_KEY)) { + return null; + } + String rawHedgingDelay = getString(hedgingPolicy, HEDGING_POLICY_HEDGING_DELAY_KEY); + try { + return parseDuration(rawHedgingDelay); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } + + @Nullable + static List<String> getNonFatalStatusCodesFromHedgingPolicy(Map<String, Object> hedgingPolicy) { + if (!hedgingPolicy.containsKey(HEDGING_POLICY_NON_FATAL_STATUS_CODES_KEY)) { + return null; + } + return checkStringList(getList(hedgingPolicy, HEDGING_POLICY_NON_FATAL_STATUS_CODES_KEY)); + } + + @Nullable static String getServiceFromName(Map<String, Object> name) { if (!name.containsKey(NAME_SERVICE_KEY)) { return null; @@ -168,6 +201,14 @@ public final class ServiceConfigUtil { } @Nullable + static Map<String, Object> getHedgingPolicyFromMethodConfig(Map<String, Object> methodConfig) { + if (!methodConfig.containsKey(METHOD_CONFIG_HEDGING_POLICY_KEY)) { + return null; + } + return getObject(methodConfig, METHOD_CONFIG_HEDGING_POLICY_KEY); + } + + @Nullable static List<Map<String, Object>> getNameListFromMethodConfig(Map<String, Object> methodConfig) { if (!methodConfig.containsKey(METHOD_CONFIG_NAME_KEY)) { return null; diff --git a/core/src/test/java/io/grpc/internal/HedgingPolicyTest.java b/core/src/test/java/io/grpc/internal/HedgingPolicyTest.java new file mode 100644 index 000000000..37e82af46 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/HedgingPolicyTest.java @@ -0,0 +1,153 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.internal.ServiceConfigInterceptor.HEDGING_POLICY_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableSet; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.MethodDescriptor; +import io.grpc.Status.Code; +import io.grpc.testing.TestMethodDescriptors; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; + +/** Unit tests for HedgingPolicy. */ +@RunWith(JUnit4.class) +public class HedgingPolicyTest { + @Test + public void getHedgingPolicies() throws Exception { + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(RetryPolicyTest.class.getResourceAsStream( + "/io/grpc/internal/test_hedging_service_config.json"), "UTF-8")); + StringBuilder sb = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + sb.append(line).append('\n'); + } + Object serviceConfigObj = JsonParser.parse(sb.toString()); + assertTrue(serviceConfigObj instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, Object> serviceConfig = (Map<String, Object>) serviceConfigObj; + + ServiceConfigInterceptor serviceConfigInterceptor = new ServiceConfigInterceptor( + /* retryEnabled = */ true, /* maxRetryAttemptsLimit = */ 3, + /* maxHedgedAttemptsLimit = */ 4); + serviceConfigInterceptor.handleUpdate(serviceConfig); + + MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod().toBuilder(); + + MethodDescriptor<Void, Void> method = builder.setFullMethodName("not/exist").build(); + assertEquals( + HedgingPolicy.DEFAULT, + serviceConfigInterceptor.getHedgingPolicyFromConfig(method)); + + method = builder.setFullMethodName("not_exist/Foo1").build(); + assertEquals( + HedgingPolicy.DEFAULT, + serviceConfigInterceptor.getHedgingPolicyFromConfig(method)); + + method = builder.setFullMethodName("SimpleService1/not_exist").build(); + + assertEquals( + new HedgingPolicy( + 3, + TimeUnit.MILLISECONDS.toNanos(2100), + ImmutableSet.of(Code.UNAVAILABLE, Code.RESOURCE_EXHAUSTED)), + serviceConfigInterceptor.getHedgingPolicyFromConfig(method)); + + method = builder.setFullMethodName("SimpleService1/Foo1").build(); + assertEquals( + new HedgingPolicy( + 4, + TimeUnit.MILLISECONDS.toNanos(100), + ImmutableSet.of(Code.UNAVAILABLE)), + serviceConfigInterceptor.getHedgingPolicyFromConfig(method)); + + method = builder.setFullMethodName("SimpleService2/not_exist").build(); + assertEquals( + HedgingPolicy.DEFAULT, + serviceConfigInterceptor.getHedgingPolicyFromConfig(method)); + + method = builder.setFullMethodName("SimpleService2/Foo2").build(); + assertEquals( + new HedgingPolicy( + 4, + TimeUnit.MILLISECONDS.toNanos(100), + ImmutableSet.of(Code.UNAVAILABLE)), + serviceConfigInterceptor.getHedgingPolicyFromConfig(method)); + } finally { + if (reader != null) { + reader.close(); + } + } + } + + @Test + public void getRetryPolicies_hedgingDisabled() throws Exception { + Channel channel = mock(Channel.class); + ArgumentCaptor<CallOptions> callOptionsCap = ArgumentCaptor.forClass(CallOptions.class); + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(RetryPolicyTest.class.getResourceAsStream( + "/io/grpc/internal/test_hedging_service_config.json"), "UTF-8")); + StringBuilder sb = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + sb.append(line).append('\n'); + } + Object serviceConfigObj = JsonParser.parse(sb.toString()); + assertTrue(serviceConfigObj instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, Object> serviceConfig = (Map<String, Object>) serviceConfigObj; + + ServiceConfigInterceptor serviceConfigInterceptor = new ServiceConfigInterceptor( + /* retryEnabled = */ false, /* maxRetryAttemptsLimit = */ 3, + /* maxHedgedAttemptsLimit = */ 4); + serviceConfigInterceptor.handleUpdate(serviceConfig); + + MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod().toBuilder(); + + MethodDescriptor<Void, Void> method = + builder.setFullMethodName("SimpleService1/Foo1").build(); + + serviceConfigInterceptor.interceptCall(method, CallOptions.DEFAULT, channel); + verify(channel).newCall(eq(method), callOptionsCap.capture()); + assertThat(callOptionsCap.getValue().getOption(HEDGING_POLICY_KEY)).isNull(); + } finally { + if (reader != null) { + reader.close(); + } + } + } +} diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 7f219376a..289430f5a 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -131,6 +131,7 @@ public class RetriableStreamTest { Executor callExecutor, ScheduledExecutorService scheduledExecutorService, final RetryPolicy retryPolicy, + final HedgingPolicy hedgingPolicy, @Nullable Throttle throttle) { super( method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, callExecutor, @@ -141,6 +142,12 @@ public class RetriableStreamTest { return retryPolicy; } }, + new HedgingPolicy.Provider() { + @Override + public HedgingPolicy get() { + return hedgingPolicy; + } + }, throttle); } @@ -173,7 +180,7 @@ public class RetriableStreamTest { return new RecordedRetriableStream( method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT, MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), RETRY_POLICY, - throttle); + HedgingPolicy.DEFAULT, throttle); } @After diff --git a/core/src/test/java/io/grpc/internal/RetryPolicyTest.java b/core/src/test/java/io/grpc/internal/RetryPolicyTest.java index c62eb1bcb..e9296b96b 100644 --- a/core/src/test/java/io/grpc/internal/RetryPolicyTest.java +++ b/core/src/test/java/io/grpc/internal/RetryPolicyTest.java @@ -63,7 +63,8 @@ public class RetryPolicyTest { Map<String, Object> serviceConfig = (Map<String, Object>) serviceConfigObj; ServiceConfigInterceptor serviceConfigInterceptor = new ServiceConfigInterceptor( - true /* retryEnabled */, 4 /* maxRetryAttemptsLimit */); + /* retryEnabled = */ true, /* maxRetryAttemptsLimit = */ 4, + /* maxHedgedAttemptsLimit = */ 3); serviceConfigInterceptor.handleUpdate(serviceConfig); MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod().toBuilder(); @@ -140,7 +141,8 @@ public class RetryPolicyTest { Map<String, Object> serviceConfig = (Map<String, Object>) serviceConfigObj; ServiceConfigInterceptor serviceConfigInterceptor = new ServiceConfigInterceptor( - false /* retryEnabled */, 4 /* maxRetryAttemptsLimit */); + /* retryEnabled = */ false, /* maxRetryAttemptsLimit = */ 4, + /* maxHedgedAttemptsLimit = */ 3); serviceConfigInterceptor.handleUpdate(serviceConfig); MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod().toBuilder(); diff --git a/core/src/test/java/io/grpc/internal/ServiceConfigInterceptorTest.java b/core/src/test/java/io/grpc/internal/ServiceConfigInterceptorTest.java index dcf1f76c5..4d06df1b8 100644 --- a/core/src/test/java/io/grpc/internal/ServiceConfigInterceptorTest.java +++ b/core/src/test/java/io/grpc/internal/ServiceConfigInterceptorTest.java @@ -17,6 +17,7 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.internal.ServiceConfigInterceptor.HEDGING_POLICY_KEY; import static io.grpc.internal.ServiceConfigInterceptor.RETRY_POLICY_KEY; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.verify; @@ -60,7 +61,7 @@ public class ServiceConfigInterceptorTest { } private final ServiceConfigInterceptor interceptor = new ServiceConfigInterceptor( - true /* retryEnabled */, 5 /* maxRetryAttemptsLimit */); + /* retryEnabled = */ true, /* maxRetryAttemptsLimit = */ 5, /* maxHedgedAttemptsLimit = */ 6); private final String fullMethodName = MethodDescriptor.generateFullMethodName("service", "method"); @@ -108,6 +109,8 @@ public class ServiceConfigInterceptorTest { assertThat(callOptionsCap.getValue().isWaitForReady()).isFalse(); assertThat(callOptionsCap.getValue().getOption(RETRY_POLICY_KEY).get()) .isEqualTo(RetryPolicy.DEFAULT); + assertThat(callOptionsCap.getValue().getOption(HEDGING_POLICY_KEY).get()) + .isEqualTo(HedgingPolicy.DEFAULT); } @Test @@ -363,9 +366,9 @@ public class ServiceConfigInterceptorTest { assertThat(interceptor.serviceMethodMap.get()) .containsExactly( methodDescriptor.getFullMethodName(), - new MethodInfo(methodConfig, false, 1)); + new MethodInfo(methodConfig, false, 1, 1)); assertThat(interceptor.serviceMap.get()).containsExactly( - "service2", new MethodInfo(methodConfig, false, 1)); + "service2", new MethodInfo(methodConfig, false, 1, 1)); } @@ -376,7 +379,7 @@ public class ServiceConfigInterceptorTest { thrown.expectMessage("Duration value is out of range"); - new MethodInfo(methodConfig, false, 1); + new MethodInfo(methodConfig, false, 1, 1); } @Test @@ -384,7 +387,7 @@ public class ServiceConfigInterceptorTest { JsonObj name = new JsonObj("service", "service"); JsonObj methodConfig = new JsonObj("name", new JsonList(name), "timeout", "315576000000s"); - MethodInfo info = new MethodInfo(methodConfig, false, 1); + MethodInfo info = new MethodInfo(methodConfig, false, 1, 1); assertThat(info.timeoutNanos).isEqualTo(Long.MAX_VALUE); } @@ -398,7 +401,7 @@ public class ServiceConfigInterceptorTest { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("exceeds bounds"); - new MethodInfo(methodConfig, false, 1); + new MethodInfo(methodConfig, false, 1, 1); } @Test @@ -409,7 +412,7 @@ public class ServiceConfigInterceptorTest { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("exceeds bounds"); - new MethodInfo(methodConfig, false, 1); + new MethodInfo(methodConfig, false, 1, 1); } private static final class NoopMarshaller implements MethodDescriptor.Marshaller<Void> { diff --git a/core/src/test/resources/io/grpc/internal/test_hedging_service_config.json b/core/src/test/resources/io/grpc/internal/test_hedging_service_config.json new file mode 100644 index 000000000..5533973dd --- /dev/null +++ b/core/src/test/resources/io/grpc/internal/test_hedging_service_config.json @@ -0,0 +1,54 @@ +{ + "loadBalancingPolicy":"round_robin", + "methodConfig":[ + { + "name":[ + { + "service":"SimpleService1" + } + ], + "waitForReady":false, + "hedgingPolicy":{ + "maxAttempts":3, + "hedgingDelay":"2.1s", + "nonFatalStatusCodes":[ + "UNAVAILABLE", + "RESOURCE_EXHAUSTED" + ] + } + }, + { + "name":[ + { + "service":"SimpleService2" + } + ], + "waitForReady":false + }, + { + "name":[ + { + "service":"SimpleService1", + "method":"Foo1" + }, + { + "service":"SimpleService2", + "method":"Foo2" + } + ], + "waitForReady":true, + "hedgingPolicy":{ + "maxAttempts":5, + "hedgingDelay":"0.1s", + "nonFatalStatusCodes":[ + "UNAVAILABLE" + ] + } + } + ], + + "retryThrottling": { + "maxTokens": 10, + "tokenRatio": 0.1 + } +} |