diff options
Diffstat (limited to 'grpclb')
-rw-r--r-- | grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java | 6 | ||||
-rw-r--r-- | grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java | 81 |
2 files changed, 56 insertions, 31 deletions
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 95ad81341..c583e839b 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -215,9 +215,6 @@ final class GrpclbState { if (usingFallbackBackends) { return; } - if (fallbackTimer != null && !fallbackTimer.discarded) { - return; - } int numReadySubchannels = 0; for (Subchannel subchannel : subchannels.values()) { if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) { @@ -392,7 +389,6 @@ final class GrpclbState { @VisibleForTesting class FallbackModeTask implements Runnable { private ScheduledFuture<?> scheduledFuture; - private boolean discarded; @Override public void run() { @@ -400,7 +396,6 @@ final class GrpclbState { @Override public void run() { checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch"); - discarded = true; maybeUseFallbackBackends(); maybeUpdatePicker(); } @@ -408,7 +403,6 @@ final class GrpclbState { } void cancel() { - discarded = true; scheduledFuture.cancel(false); } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index e54052810..9c394893a 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -43,7 +43,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.protobuf.util.Durations; @@ -1077,29 +1076,6 @@ public class GrpclbLoadBalancerTest { fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS); assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - ///////////////////////////////////////////// - // Break the LB stream before timer expires - ///////////////////////////////////////////// - Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); - lbResponseObserver.onError(streamError.asException()); - // Not in fallback mode. The error will be propagated. - verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker.dropList).isEmpty(); - ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList); - Status status = errorEntry.result.getStatus(); - assertThat(status.getCode()).isEqualTo(streamError.getCode()); - assertThat(status.getDescription()).contains(streamError.getDescription()); - // A new stream is created - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - ////////////////////////////////// // Fallback timer expires (or not) ////////////////////////////////// @@ -1156,13 +1132,14 @@ public class GrpclbLoadBalancerTest { // Break the LB stream after the timer expires //////////////////////////////////////////////// if (timerExpires) { + Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); lbResponseObserver.onError(streamError.asException()); // The error will NOT propagate to picker because fallback list is in use. inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); // A new stream is created - verify(mockLbService, times(3)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); @@ -1198,6 +1175,60 @@ public class GrpclbLoadBalancerTest { } @Test + public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { + long loadReportIntervalMillis = 1983; + InOrder inOrder = inOrder(helper, subchannelPool); + + // Create a resolution list with a mixture of balancer and backend addresses + List<EquivalentAddressGroup> resolutionList = + createResolvedServerAddresses(false, true, false); + Attributes resolutionAttrs = Attributes.EMPTY; + deliverResolvedAddresses(resolutionList, resolutionAttrs); + + inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); + + // Attempted to connect to balancer + assertEquals(1, fakeOobChannels.size()); + ManagedChannel oobChannel = fakeOobChannels.poll(); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); + + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); + // We don't care if runSerialized() has been run. + inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); + + inOrder.verifyNoMoreInteractions(); + + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + + ///////////////////////////////////////////// + // Break the LB stream before timer expires + ///////////////////////////////////////////// + Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); + lbResponseObserver.onError(streamError.asException()); + + // Fall back to the backends from resolver + fallbackTestVerifyUseOfFallbackBackendLists( + inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); + + // A new stream is created + verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + } + + @Test public void grpclbFallback_balancerLost() { subtestGrpclbFallbackConnectionLost(true, false); } |