diff options
author | Kun Zhang <zhangkun83@users.noreply.github.com> | 2018-06-12 14:04:45 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-06-12 14:04:45 -0700 |
commit | 15786520f9890774cc41abbf23835589a5e4e167 (patch) | |
tree | dac37d7e93a819d534fb923f80d5169ea2f37d3f /grpclb | |
parent | defb955f3ab233e11d960a42495ca955306d57a4 (diff) | |
download | grpc-grpc-java-15786520f9890774cc41abbf23835589a5e4e167.tar.gz |
grpclb: use exponential back-off for retries of balancer RPCs (#4525)
Diffstat (limited to 'grpclb')
4 files changed, 193 insertions, 6 deletions
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index 5c428960c..24e870fb4 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -25,6 +25,7 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.Status; import io.grpc.grpclb.GrpclbConstants.LbPolicy; +import io.grpc.internal.BackoffPolicy; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.LogId; import io.grpc.internal.ObjectPool; @@ -55,6 +56,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { private final Factory roundRobinBalancerFactory; private final ObjectPool<ScheduledExecutorService> timerServicePool; private final TimeProvider time; + private final BackoffPolicy.Provider backoffPolicyProvider; // All mutable states in this class are mutated ONLY from Channel Executor @@ -71,7 +73,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { GrpclbLoadBalancer(Helper helper, SubchannelPool subchannelPool, Factory pickFirstBalancerFactory, Factory roundRobinBalancerFactory, ObjectPool<ScheduledExecutorService> timerServicePool, - TimeProvider time) { + TimeProvider time, BackoffPolicy.Provider backoffPolicyProvider) { this.helper = checkNotNull(helper, "helper"); this.pickFirstBalancerFactory = checkNotNull(pickFirstBalancerFactory, "pickFirstBalancerFactory"); @@ -80,6 +82,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { this.timerServicePool = checkNotNull(timerServicePool, "timerServicePool"); this.timerService = checkNotNull(timerServicePool.getObject(), "timerService"); this.time = checkNotNull(time, "time provider"); + this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool"); this.subchannelPool.init(helper, timerService); setLbPolicy(LbPolicy.GRPCLB); @@ -163,8 +166,8 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { "roundRobinBalancerFactory.newLoadBalancer()"); break; case GRPCLB: - grpclbState = - new GrpclbState(helper, subchannelPool, time, timerService, logId); + grpclbState = new GrpclbState( + helper, subchannelPool, time, timerService, backoffPolicyProvider, logId); break; default: // Do nohting diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java index 4bde1f225..00159b480 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java @@ -19,6 +19,7 @@ package io.grpc.grpclb; import io.grpc.ExperimentalApi; import io.grpc.LoadBalancer; import io.grpc.PickFirstBalancerFactory; +import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TimeProvider; @@ -52,6 +53,7 @@ public class GrpclbLoadBalancerFactory extends LoadBalancer.Factory { // load should not be on the shared scheduled executor, we should use a combination of the // scheduled executor and the default app executor. SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE), - TimeProvider.SYSTEM_TIME_PROVIDER); + TimeProvider.SYSTEM_TIME_PROVIDER, + new ExponentialBackoffPolicy.Provider()); } } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 606be53df..4da76158b 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -42,6 +42,7 @@ import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.grpclb.LoadBalanceResponse.LoadBalanceResponseTypeCase; +import io.grpc.internal.BackoffPolicy; import io.grpc.internal.LogId; import io.grpc.internal.TimeProvider; import io.grpc.stub.StreamObserver; @@ -102,6 +103,7 @@ final class GrpclbState { private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO = Attributes.Key.create("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo"); + private final BackoffPolicy.Provider backoffPolicyProvider; // Scheduled only once. Never reset. @Nullable @@ -111,6 +113,11 @@ final class GrpclbState { // True if the current balancer has returned a serverlist. Will be reset to false when lost // connection to a balancer. private boolean balancerWorking; + @Nullable + private BackoffPolicy lbRpcRetryPolicy; + @Nullable + private LbRpcRetryTask lbRpcRetryTimer; + private long prevLbRpcStartNanos; @Nullable private ManagedChannel lbCommChannel; @@ -133,11 +140,13 @@ final class GrpclbState { SubchannelPool subchannelPool, TimeProvider time, ScheduledExecutorService timerService, + BackoffPolicy.Provider backoffPolicyProvider, LogId logId) { this.helper = checkNotNull(helper, "helper"); this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool"); this.time = checkNotNull(time, "time provider"); this.timerService = checkNotNull(timerService, "timerService"); + this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority"); this.logId = checkNotNull(logId, "logId"); } @@ -262,6 +271,7 @@ final class GrpclbState { LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel); lbStream = new LbStream(stub); lbStream.start(); + prevLbRpcStartNanos = time.currentTimeNanos(); LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder() .setInitialRequest(InitialLoadBalanceRequest.newBuilder() @@ -280,6 +290,12 @@ final class GrpclbState { } } + private void cancelLbRpcRetryTimer() { + if (lbRpcRetryTimer != null) { + lbRpcRetryTimer.cancel(); + } + } + void shutdown() { shutdownLbComm(); // We close the subchannels through subchannelPool instead of helper just for convenience of @@ -290,6 +306,7 @@ final class GrpclbState { subchannels = Collections.emptyMap(); subchannelPool.clear(); cancelFallbackTimer(); + cancelLbRpcRetryTimer(); } void propagateError(Status status) { @@ -389,6 +406,32 @@ final class GrpclbState { } @VisibleForTesting + class LbRpcRetryTask implements Runnable { + private ScheduledFuture<?> scheduledFuture; + + @Override + public void run() { + helper.runSerialized(new Runnable() { + @Override + public void run() { + checkState( + lbRpcRetryTimer == LbRpcRetryTask.this, "LbRpc retry timer mismatch"); + startLbRpc(); + } + }); + } + + void cancel() { + scheduledFuture.cancel(false); + } + + void schedule(long delayNanos) { + checkState(scheduledFuture == null, "LbRpcRetryTask already scheduled"); + scheduledFuture = timerService.schedule(this, delayNanos, TimeUnit.NANOSECONDS); + } + } + + @VisibleForTesting class LoadReportingTask implements Runnable { private final LbStream stream; @@ -558,7 +601,27 @@ final class GrpclbState { balancerWorking = false; maybeUseFallbackBackends(); maybeUpdatePicker(); - startLbRpc(); + + long delayNanos = 0; + if (initialResponseReceived || lbRpcRetryPolicy == null) { + // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence + // has never been initialized. + lbRpcRetryPolicy = backoffPolicyProvider.get(); + } + // Backoff only when balancer wasn't working previously. + if (!initialResponseReceived) { + // The back-off policy determines the interval between consecutive RPC upstarts, thus the + // actual delay may be smaller than the value from the back-off policy, or even negative, + // depending how much time was spent in the previous RPC. + delayNanos = + prevLbRpcStartNanos + lbRpcRetryPolicy.nextBackoffNanos() - time.currentTimeNanos(); + } + if (delayNanos <= 0) { + startLbRpc(); + } else { + lbRpcRetryTimer = new LbRpcRetryTask(); + lbRpcRetryTimer.schedule(delayNanos); + } } void close(@Nullable Exception error) { diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index c952140c6..4e238981d 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -70,6 +70,7 @@ import io.grpc.grpclb.GrpclbState.ErrorEntry; import io.grpc.grpclb.GrpclbState.RoundRobinPicker; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.BackoffPolicy; import io.grpc.internal.FakeClock; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.ObjectPool; @@ -119,6 +120,13 @@ public class GrpclbLoadBalancerTest { return command instanceof GrpclbState.FallbackModeTask; } }; + private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER = + new FakeClock.TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command instanceof GrpclbState.LbRpcRetryTask; + } + }; @Mock private Helper helper; @@ -157,6 +165,12 @@ public class GrpclbLoadBalancerTest { private LoadBalancer roundRobinBalancer; @Mock private ObjectPool<ScheduledExecutorService> timerServicePool; + @Mock + private BackoffPolicy.Provider backoffPolicyProvider; + @Mock + private BackoffPolicy backoffPolicy1; + @Mock + private BackoffPolicy backoffPolicy2; private GrpclbLoadBalancer balancer; @SuppressWarnings("unchecked") @@ -238,10 +252,13 @@ public class GrpclbLoadBalancerTest { when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY); ScheduledExecutorService timerService = fakeClock.getScheduledExecutorService(); when(timerServicePool.getObject()).thenReturn(timerService); + when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); + when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L); + when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); balancer = new GrpclbLoadBalancer( helper, subchannelPool, pickFirstBalancerFactory, roundRobinBalancerFactory, timerServicePool, - timeProvider); + timeProvider, backoffPolicyProvider); verify(subchannelPool).init(same(helper), same(timerService)); } @@ -1583,6 +1600,108 @@ public class GrpclbLoadBalancerTest { verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1"); } + @Test + public void grpclbBalancerStreamRetry() throws Exception { + LoadBalanceRequest expectedInitialRequest = + LoadBalanceRequest.newBuilder() + .setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build(); + InOrder inOrder = + inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); + List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); + Attributes grpclbResolutionAttrs = Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + + assertEquals(1, fakeOobChannels.size()); + ManagedChannel oobChannel = fakeOobChannels.poll(); + + // First balancer RPC + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // Balancer closes it immediately (erroneously) + lbResponseObserver.onCompleted(); + // Will start backoff sequence 1 (10ns) + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // Fast-forward to a moment before the retry + fakeClock.forwardNanos(9); + verifyNoMoreInteractions(mockLbService); + // Then time for retry + fakeClock.forwardNanos(1); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // Balancer closes it with an error. + lbResponseObserver.onError(Status.UNAVAILABLE.asException()); + // Will continue the backoff sequence 1 (100ns) + verifyNoMoreInteractions(backoffPolicyProvider); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // Fast-forward to a moment before the retry + fakeClock.forwardNanos(100 - 1); + verifyNoMoreInteractions(mockLbService); + // Then time for retry + fakeClock.forwardNanos(1); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // Balancer sends initial response. + lbResponseObserver.onNext(buildInitialResponse()); + + // Then breaks the RPC + lbResponseObserver.onError(Status.UNAVAILABLE.asException()); + + // Will reset the retry sequence and retry immediately, because balancer has responded. + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); + + // Fail the retry after spending 4ns + fakeClock.forwardNanos(4); + lbResponseObserver.onError(Status.UNAVAILABLE.asException()); + + // Will be on the first retry (10ns) of backoff sequence 2. + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // Fast-forward to a moment before the retry, the time spent in the last try is deducted. + fakeClock.forwardNanos(10 - 4 - 1); + verifyNoMoreInteractions(mockLbService); + // Then time for retry + fakeClock.forwardNanos(1); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext(eq(expectedInitialRequest)); + assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER)); + + // Wrapping up + verify(backoffPolicyProvider, times(2)).get(); + verify(backoffPolicy1, times(2)).nextBackoffNanos(); + verify(backoffPolicy2, times(1)).nextBackoffNanos(); + } + private void deliverSubchannelState( final Subchannel subchannel, final ConnectivityStateInfo newState) { channelExecutor.execute(new Runnable() { |