aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKun Zhang <zhangkun83@users.noreply.github.com>2018-10-24 10:21:37 -0700
committerGitHub <noreply@github.com>2018-10-24 10:21:37 -0700
commit6046da4a232df87b4d73aa53874b0dcf8d3025c8 (patch)
tree67853672669d74dc89144f1885bc9ec3affeb50f
parent0981ba68ee5e9217506cfcca7ff730c056f5c92d (diff)
downloadgrpc-grpc-java-6046da4a232df87b4d73aa53874b0dcf8d3025c8.tar.gz
grpclb: enter fallback when LB stream broken even before fallback timer expires (#4990) (#4997)
Previously the client waits ~10 seconds until the fallback timer has expired. While the timer is useful to address the long tail, it shouldn't delay using the fallback in case of obvious errors, like the channel failing to connect or an UNIMPLEMENTED response. This is a cherry-pick of b701e8920daaccf9e6eb2916d2ea94da8df74be5 from master.
-rw-r--r--grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java6
-rw-r--r--grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java81
2 files changed, 56 insertions, 31 deletions
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
index 95ad81341..c583e839b 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
@@ -215,9 +215,6 @@ final class GrpclbState {
if (usingFallbackBackends) {
return;
}
- if (fallbackTimer != null && !fallbackTimer.discarded) {
- return;
- }
int numReadySubchannels = 0;
for (Subchannel subchannel : subchannels.values()) {
if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) {
@@ -392,7 +389,6 @@ final class GrpclbState {
@VisibleForTesting
class FallbackModeTask implements Runnable {
private ScheduledFuture<?> scheduledFuture;
- private boolean discarded;
@Override
public void run() {
@@ -400,7 +396,6 @@ final class GrpclbState {
@Override
public void run() {
checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch");
- discarded = true;
maybeUseFallbackBackends();
maybeUpdatePicker();
}
@@ -408,7 +403,6 @@ final class GrpclbState {
}
void cancel() {
- discarded = true;
scheduledFuture.cancel(false);
}
diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
index e54052810..9c394893a 100644
--- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
+++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
@@ -43,7 +43,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Durations;
@@ -1077,29 +1076,6 @@ public class GrpclbLoadBalancerTest {
fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
- /////////////////////////////////////////////
- // Break the LB stream before timer expires
- /////////////////////////////////////////////
- Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
- lbResponseObserver.onError(streamError.asException());
- // Not in fallback mode. The error will be propagated.
- verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
- RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
- assertThat(picker.dropList).isEmpty();
- ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList);
- Status status = errorEntry.result.getStatus();
- assertThat(status.getCode()).isEqualTo(streamError.getCode());
- assertThat(status.getDescription()).contains(streamError.getDescription());
- // A new stream is created
- verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
- lbResponseObserver = lbResponseObserverCaptor.getValue();
- assertEquals(1, lbRequestObservers.size());
- lbRequestObserver = lbRequestObservers.poll();
- verify(lbRequestObserver).onNext(
- eq(LoadBalanceRequest.newBuilder().setInitialRequest(
- InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
- .build()));
-
//////////////////////////////////
// Fallback timer expires (or not)
//////////////////////////////////
@@ -1156,13 +1132,14 @@ public class GrpclbLoadBalancerTest {
// Break the LB stream after the timer expires
////////////////////////////////////////////////
if (timerExpires) {
+ Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
lbResponseObserver.onError(streamError.asException());
// The error will NOT propagate to picker because fallback list is in use.
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
// A new stream is created
- verify(mockLbService, times(3)).balanceLoad(lbResponseObserverCaptor.capture());
+ verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
@@ -1198,6 +1175,60 @@ public class GrpclbLoadBalancerTest {
}
@Test
+ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() {
+ long loadReportIntervalMillis = 1983;
+ InOrder inOrder = inOrder(helper, subchannelPool);
+
+ // Create a resolution list with a mixture of balancer and backend addresses
+ List<EquivalentAddressGroup> resolutionList =
+ createResolvedServerAddresses(false, true, false);
+ Attributes resolutionAttrs = Attributes.EMPTY;
+ deliverResolvedAddresses(resolutionList, resolutionAttrs);
+
+ inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
+
+ // Attempted to connect to balancer
+ assertEquals(1, fakeOobChannels.size());
+ ManagedChannel oobChannel = fakeOobChannels.poll();
+ verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
+ StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
+
+ verify(lbRequestObserver).onNext(
+ eq(LoadBalanceRequest.newBuilder().setInitialRequest(
+ InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
+ .build()));
+ lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
+ // We don't care if runSerialized() has been run.
+ inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class));
+
+ inOrder.verifyNoMoreInteractions();
+
+ assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+
+ /////////////////////////////////////////////
+ // Break the LB stream before timer expires
+ /////////////////////////////////////////////
+ Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
+ lbResponseObserver.onError(streamError.asException());
+
+ // Fall back to the backends from resolver
+ fallbackTestVerifyUseOfFallbackBackendLists(
+ inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
+
+ // A new stream is created
+ verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
+ lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ lbRequestObserver = lbRequestObservers.poll();
+ verify(lbRequestObserver).onNext(
+ eq(LoadBalanceRequest.newBuilder().setInitialRequest(
+ InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
+ .build()));
+ }
+
+ @Test
public void grpclbFallback_balancerLost() {
subtestGrpclbFallbackConnectionLost(true, false);
}