aboutsummaryrefslogtreecommitdiff
path: root/grpclb
diff options
context:
space:
mode:
authorKun Zhang <zhangkun83@users.noreply.github.com>2018-06-12 14:04:45 -0700
committerGitHub <noreply@github.com>2018-06-12 14:04:45 -0700
commit15786520f9890774cc41abbf23835589a5e4e167 (patch)
treedac37d7e93a819d534fb923f80d5169ea2f37d3f /grpclb
parentdefb955f3ab233e11d960a42495ca955306d57a4 (diff)
downloadgrpc-grpc-java-15786520f9890774cc41abbf23835589a5e4e167.tar.gz
grpclb: use exponential back-off for retries of balancer RPCs (#4525)
Diffstat (limited to 'grpclb')
-rw-r--r--grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java9
-rw-r--r--grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java4
-rw-r--r--grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java65
-rw-r--r--grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java121
4 files changed, 193 insertions, 6 deletions
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
index 5c428960c..24e870fb4 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
@@ -25,6 +25,7 @@ import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.grpclb.GrpclbConstants.LbPolicy;
+import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.LogId;
import io.grpc.internal.ObjectPool;
@@ -55,6 +56,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
private final Factory roundRobinBalancerFactory;
private final ObjectPool<ScheduledExecutorService> timerServicePool;
private final TimeProvider time;
+ private final BackoffPolicy.Provider backoffPolicyProvider;
// All mutable states in this class are mutated ONLY from Channel Executor
@@ -71,7 +73,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
GrpclbLoadBalancer(Helper helper, SubchannelPool subchannelPool, Factory pickFirstBalancerFactory,
Factory roundRobinBalancerFactory, ObjectPool<ScheduledExecutorService> timerServicePool,
- TimeProvider time) {
+ TimeProvider time, BackoffPolicy.Provider backoffPolicyProvider) {
this.helper = checkNotNull(helper, "helper");
this.pickFirstBalancerFactory =
checkNotNull(pickFirstBalancerFactory, "pickFirstBalancerFactory");
@@ -80,6 +82,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
this.timerServicePool = checkNotNull(timerServicePool, "timerServicePool");
this.timerService = checkNotNull(timerServicePool.getObject(), "timerService");
this.time = checkNotNull(time, "time provider");
+ this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
this.subchannelPool.init(helper, timerService);
setLbPolicy(LbPolicy.GRPCLB);
@@ -163,8 +166,8 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
"roundRobinBalancerFactory.newLoadBalancer()");
break;
case GRPCLB:
- grpclbState =
- new GrpclbState(helper, subchannelPool, time, timerService, logId);
+ grpclbState = new GrpclbState(
+ helper, subchannelPool, time, timerService, backoffPolicyProvider, logId);
break;
default:
// Do nohting
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java
index 4bde1f225..00159b480 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java
@@ -19,6 +19,7 @@ package io.grpc.grpclb;
import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.PickFirstBalancerFactory;
+import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TimeProvider;
@@ -52,6 +53,7 @@ public class GrpclbLoadBalancerFactory extends LoadBalancer.Factory {
// load should not be on the shared scheduled executor, we should use a combination of the
// scheduled executor and the default app executor.
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE),
- TimeProvider.SYSTEM_TIME_PROVIDER);
+ TimeProvider.SYSTEM_TIME_PROVIDER,
+ new ExponentialBackoffPolicy.Provider());
}
}
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
index 606be53df..4da76158b 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
@@ -42,6 +42,7 @@ import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.grpclb.LoadBalanceResponse.LoadBalanceResponseTypeCase;
+import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.LogId;
import io.grpc.internal.TimeProvider;
import io.grpc.stub.StreamObserver;
@@ -102,6 +103,7 @@ final class GrpclbState {
private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.create("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo");
+ private final BackoffPolicy.Provider backoffPolicyProvider;
// Scheduled only once. Never reset.
@Nullable
@@ -111,6 +113,11 @@ final class GrpclbState {
// True if the current balancer has returned a serverlist. Will be reset to false when lost
// connection to a balancer.
private boolean balancerWorking;
+ @Nullable
+ private BackoffPolicy lbRpcRetryPolicy;
+ @Nullable
+ private LbRpcRetryTask lbRpcRetryTimer;
+ private long prevLbRpcStartNanos;
@Nullable
private ManagedChannel lbCommChannel;
@@ -133,11 +140,13 @@ final class GrpclbState {
SubchannelPool subchannelPool,
TimeProvider time,
ScheduledExecutorService timerService,
+ BackoffPolicy.Provider backoffPolicyProvider,
LogId logId) {
this.helper = checkNotNull(helper, "helper");
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
this.time = checkNotNull(time, "time provider");
this.timerService = checkNotNull(timerService, "timerService");
+ this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
this.logId = checkNotNull(logId, "logId");
}
@@ -262,6 +271,7 @@ final class GrpclbState {
LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel);
lbStream = new LbStream(stub);
lbStream.start();
+ prevLbRpcStartNanos = time.currentTimeNanos();
LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()
.setInitialRequest(InitialLoadBalanceRequest.newBuilder()
@@ -280,6 +290,12 @@ final class GrpclbState {
}
}
+ private void cancelLbRpcRetryTimer() {
+ if (lbRpcRetryTimer != null) {
+ lbRpcRetryTimer.cancel();
+ }
+ }
+
void shutdown() {
shutdownLbComm();
// We close the subchannels through subchannelPool instead of helper just for convenience of
@@ -290,6 +306,7 @@ final class GrpclbState {
subchannels = Collections.emptyMap();
subchannelPool.clear();
cancelFallbackTimer();
+ cancelLbRpcRetryTimer();
}
void propagateError(Status status) {
@@ -389,6 +406,32 @@ final class GrpclbState {
}
@VisibleForTesting
+ class LbRpcRetryTask implements Runnable {
+ private ScheduledFuture<?> scheduledFuture;
+
+ @Override
+ public void run() {
+ helper.runSerialized(new Runnable() {
+ @Override
+ public void run() {
+ checkState(
+ lbRpcRetryTimer == LbRpcRetryTask.this, "LbRpc retry timer mismatch");
+ startLbRpc();
+ }
+ });
+ }
+
+ void cancel() {
+ scheduledFuture.cancel(false);
+ }
+
+ void schedule(long delayNanos) {
+ checkState(scheduledFuture == null, "LbRpcRetryTask already scheduled");
+ scheduledFuture = timerService.schedule(this, delayNanos, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ @VisibleForTesting
class LoadReportingTask implements Runnable {
private final LbStream stream;
@@ -558,7 +601,27 @@ final class GrpclbState {
balancerWorking = false;
maybeUseFallbackBackends();
maybeUpdatePicker();
- startLbRpc();
+
+ long delayNanos = 0;
+ if (initialResponseReceived || lbRpcRetryPolicy == null) {
+ // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
+ // has never been initialized.
+ lbRpcRetryPolicy = backoffPolicyProvider.get();
+ }
+ // Backoff only when balancer wasn't working previously.
+ if (!initialResponseReceived) {
+ // The back-off policy determines the interval between consecutive RPC upstarts, thus the
+ // actual delay may be smaller than the value from the back-off policy, or even negative,
+ // depending how much time was spent in the previous RPC.
+ delayNanos =
+ prevLbRpcStartNanos + lbRpcRetryPolicy.nextBackoffNanos() - time.currentTimeNanos();
+ }
+ if (delayNanos <= 0) {
+ startLbRpc();
+ } else {
+ lbRpcRetryTimer = new LbRpcRetryTask();
+ lbRpcRetryTimer.schedule(delayNanos);
+ }
}
void close(@Nullable Exception error) {
diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
index c952140c6..4e238981d 100644
--- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
+++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
@@ -70,6 +70,7 @@ import io.grpc.grpclb.GrpclbState.ErrorEntry;
import io.grpc.grpclb.GrpclbState.RoundRobinPicker;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ObjectPool;
@@ -119,6 +120,13 @@ public class GrpclbLoadBalancerTest {
return command instanceof GrpclbState.FallbackModeTask;
}
};
+ private static final FakeClock.TaskFilter LB_RPC_RETRY_TASK_FILTER =
+ new FakeClock.TaskFilter() {
+ @Override
+ public boolean shouldAccept(Runnable command) {
+ return command instanceof GrpclbState.LbRpcRetryTask;
+ }
+ };
@Mock
private Helper helper;
@@ -157,6 +165,12 @@ public class GrpclbLoadBalancerTest {
private LoadBalancer roundRobinBalancer;
@Mock
private ObjectPool<ScheduledExecutorService> timerServicePool;
+ @Mock
+ private BackoffPolicy.Provider backoffPolicyProvider;
+ @Mock
+ private BackoffPolicy backoffPolicy1;
+ @Mock
+ private BackoffPolicy backoffPolicy2;
private GrpclbLoadBalancer balancer;
@SuppressWarnings("unchecked")
@@ -238,10 +252,13 @@ public class GrpclbLoadBalancerTest {
when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY);
ScheduledExecutorService timerService = fakeClock.getScheduledExecutorService();
when(timerServicePool.getObject()).thenReturn(timerService);
+ when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
+ when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L);
+ when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
balancer = new GrpclbLoadBalancer(
helper, subchannelPool, pickFirstBalancerFactory, roundRobinBalancerFactory,
timerServicePool,
- timeProvider);
+ timeProvider, backoffPolicyProvider);
verify(subchannelPool).init(same(helper), same(timerService));
}
@@ -1583,6 +1600,108 @@ public class GrpclbLoadBalancerTest {
verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1");
}
+ @Test
+ public void grpclbBalancerStreamRetry() throws Exception {
+ LoadBalanceRequest expectedInitialRequest =
+ LoadBalanceRequest.newBuilder()
+ .setInitialRequest(
+ InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
+ .build();
+ InOrder inOrder =
+ inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
+ List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
+ Attributes grpclbResolutionAttrs = Attributes.newBuilder()
+ .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
+ deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
+
+ assertEquals(1, fakeOobChannels.size());
+ ManagedChannel oobChannel = fakeOobChannels.poll();
+
+ // First balancer RPC
+ inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
+ StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
+ verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
+ assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
+
+ // Balancer closes it immediately (erroneously)
+ lbResponseObserver.onCompleted();
+ // Will start backoff sequence 1 (10ns)
+ inOrder.verify(backoffPolicyProvider).get();
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
+
+ // Fast-forward to a moment before the retry
+ fakeClock.forwardNanos(9);
+ verifyNoMoreInteractions(mockLbService);
+ // Then time for retry
+ fakeClock.forwardNanos(1);
+ inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
+ lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ lbRequestObserver = lbRequestObservers.poll();
+ verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
+ assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
+
+ // Balancer closes it with an error.
+ lbResponseObserver.onError(Status.UNAVAILABLE.asException());
+ // Will continue the backoff sequence 1 (100ns)
+ verifyNoMoreInteractions(backoffPolicyProvider);
+ inOrder.verify(backoffPolicy1).nextBackoffNanos();
+ assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
+
+ // Fast-forward to a moment before the retry
+ fakeClock.forwardNanos(100 - 1);
+ verifyNoMoreInteractions(mockLbService);
+ // Then time for retry
+ fakeClock.forwardNanos(1);
+ inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
+ lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ lbRequestObserver = lbRequestObservers.poll();
+ verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
+ assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
+
+ // Balancer sends initial response.
+ lbResponseObserver.onNext(buildInitialResponse());
+
+ // Then breaks the RPC
+ lbResponseObserver.onError(Status.UNAVAILABLE.asException());
+
+ // Will reset the retry sequence and retry immediately, because balancer has responded.
+ inOrder.verify(backoffPolicyProvider).get();
+ inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
+ lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ lbRequestObserver = lbRequestObservers.poll();
+ verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
+
+ // Fail the retry after spending 4ns
+ fakeClock.forwardNanos(4);
+ lbResponseObserver.onError(Status.UNAVAILABLE.asException());
+
+ // Will be on the first retry (10ns) of backoff sequence 2.
+ inOrder.verify(backoffPolicy2).nextBackoffNanos();
+ assertEquals(1, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
+
+ // Fast-forward to a moment before the retry, the time spent in the last try is deducted.
+ fakeClock.forwardNanos(10 - 4 - 1);
+ verifyNoMoreInteractions(mockLbService);
+ // Then time for retry
+ fakeClock.forwardNanos(1);
+ inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
+ assertEquals(1, lbRequestObservers.size());
+ lbRequestObserver = lbRequestObservers.poll();
+ verify(lbRequestObserver).onNext(eq(expectedInitialRequest));
+ assertEquals(0, fakeClock.numPendingTasks(LB_RPC_RETRY_TASK_FILTER));
+
+ // Wrapping up
+ verify(backoffPolicyProvider, times(2)).get();
+ verify(backoffPolicy1, times(2)).nextBackoffNanos();
+ verify(backoffPolicy2, times(1)).nextBackoffNanos();
+ }
+
private void deliverSubchannelState(
final Subchannel subchannel, final ConnectivityStateInfo newState) {
channelExecutor.execute(new Runnable() {