From 6046da4a232df87b4d73aa53874b0dcf8d3025c8 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 24 Oct 2018 10:21:37 -0700 Subject: grpclb: enter fallback when LB stream broken even before fallback timer expires (#4990) (#4997) Previously the client waits ~10 seconds until the fallback timer has expired. While the timer is useful to address the long tail, it shouldn't delay using the fallback in case of obvious errors, like the channel failing to connect or an UNIMPLEMENTED response. This is a cherry-pick of b701e8920daaccf9e6eb2916d2ea94da8df74be5 from master. --- .../src/main/java/io/grpc/grpclb/GrpclbState.java | 6 -- .../io/grpc/grpclb/GrpclbLoadBalancerTest.java | 81 +++++++++++++++------- 2 files changed, 56 insertions(+), 31 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 95ad81341..c583e839b 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -215,9 +215,6 @@ final class GrpclbState { if (usingFallbackBackends) { return; } - if (fallbackTimer != null && !fallbackTimer.discarded) { - return; - } int numReadySubchannels = 0; for (Subchannel subchannel : subchannels.values()) { if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) { @@ -392,7 +389,6 @@ final class GrpclbState { @VisibleForTesting class FallbackModeTask implements Runnable { private ScheduledFuture scheduledFuture; - private boolean discarded; @Override public void run() { @@ -400,7 +396,6 @@ final class GrpclbState { @Override public void run() { checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch"); - discarded = true; maybeUseFallbackBackends(); maybeUpdatePicker(); } @@ -408,7 +403,6 @@ final class GrpclbState { } void cancel() { - discarded = true; scheduledFuture.cancel(false); } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index e54052810..9c394893a 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -43,7 +43,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.protobuf.util.Durations; @@ -1077,29 +1076,6 @@ public class GrpclbLoadBalancerTest { fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS); assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - ///////////////////////////////////////////// - // Break the LB stream before timer expires - ///////////////////////////////////////////// - Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); - lbResponseObserver.onError(streamError.asException()); - // Not in fallback mode. The error will be propagated. - verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker.dropList).isEmpty(); - ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList); - Status status = errorEntry.result.getStatus(); - assertThat(status.getCode()).isEqualTo(streamError.getCode()); - assertThat(status.getDescription()).contains(streamError.getDescription()); - // A new stream is created - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - ////////////////////////////////// // Fallback timer expires (or not) ////////////////////////////////// @@ -1156,13 +1132,14 @@ public class GrpclbLoadBalancerTest { // Break the LB stream after the timer expires //////////////////////////////////////////////// if (timerExpires) { + Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); lbResponseObserver.onError(streamError.asException()); // The error will NOT propagate to picker because fallback list is in use. inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); // A new stream is created - verify(mockLbService, times(3)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); @@ -1197,6 +1174,60 @@ public class GrpclbLoadBalancerTest { assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); } + @Test + public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { + long loadReportIntervalMillis = 1983; + InOrder inOrder = inOrder(helper, subchannelPool); + + // Create a resolution list with a mixture of balancer and backend addresses + List resolutionList = + createResolvedServerAddresses(false, true, false); + Attributes resolutionAttrs = Attributes.EMPTY; + deliverResolvedAddresses(resolutionList, resolutionAttrs); + + inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); + + // Attempted to connect to balancer + assertEquals(1, fakeOobChannels.size()); + ManagedChannel oobChannel = fakeOobChannels.poll(); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); + // We don't care if runSerialized() has been run. + inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); + + inOrder.verifyNoMoreInteractions(); + + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + + ///////////////////////////////////////////// + // Break the LB stream before timer expires + ///////////////////////////////////////////// + Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); + lbResponseObserver.onError(streamError.asException()); + + // Fall back to the backends from resolver + fallbackTestVerifyUseOfFallbackBackendLists( + inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); + + // A new stream is created + verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + } + @Test public void grpclbFallback_balancerLost() { subtestGrpclbFallbackConnectionLost(true, false); -- cgit v1.2.3