aboutsummaryrefslogtreecommitdiff
path: root/grpclb
diff options
context:
space:
mode:
Diffstat (limited to 'grpclb')
-rw-r--r--grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java6
-rw-r--r--grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java81
2 files changed, 56 insertions, 31 deletions
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
index 95ad81341..c583e839b 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
@@ -215,9 +215,6 @@ final class GrpclbState {
if (usingFallbackBackends) {
return;
}
- if (fallbackTimer != null && !fallbackTimer.discarded) {
- return;
- }
int numReadySubchannels = 0;
for (Subchannel subchannel : subchannels.values()) {
if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) {
@@ -392,7 +389,6 @@ final class GrpclbState {
@VisibleForTesting
class FallbackModeTask implements Runnable {
private ScheduledFuture<?> scheduledFuture;
- private boolean discarded;
@Override
public void run() {
@@ -400,7 +396,6 @@ final class GrpclbState {
@Override
public void run() {
checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch");
- discarded = true;
maybeUseFallbackBackends();
maybeUpdatePicker();
}
@@ -408,7 +403,6 @@ final class GrpclbState {
}
void cancel() {
- discarded = true;
scheduledFuture.cancel(false);
}
diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
index e54052810..9c394893a 100644
--- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
+++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
@@ -43,7 +43,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Durations;
@@ -1077,29 +1076,6 @@ public class GrpclbLoadBalancerTest {
fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
- /////////////////////////////////////////////
- // Break the LB stream before timer expires
- /////////////////////////////////////////////
- Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
- lbResponseObserver.onError(streamError.asException());
- // Not in fallback mode. The error will be propagated.
- verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
- RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
- assertThat(picker.dropList).isEmpty();
- ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList);
- Status status = errorEntry.result.getStatus();
- assertThat(status.getCode()).isEqualTo(streamError.getCode());
- assertThat(status.getDescription()).contains(streamError.getDescription());
- // A new stream is created
- verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
- lbResponseObserver = lbResponseObserverCaptor.getValue();
- assertEquals(1, lbRequestObservers.size());
- lbRequestObserver = lbRequestObservers.poll();
- verify(lbRequestObserver).onNext(
- eq(LoadBalanceRequest.newBuilder().setInitialRequest(
- InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
- .build()));
-
//////////////////////////////////
// Fallback timer expires (or not)
//////////////////////////////////
@@ -1156,13 +1132,14 @@ public class GrpclbLoadBalancerTest {
// Break the LB stream after the timer expires
////////////////////////////////////////////////
if (timerExpires) {
+ Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
lbResponseObserver.onError(streamError.asException());
// The error will NOT propagate to picker because fallback list is in use.
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
// A new stream is created
- verify(mockLbService, times(3)).balanceLoad(lbResponseObserverCaptor.capture());
+ verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
@@ -1198,6 +1175,60 @@ public class GrpclbLoadBalancerTest {
}
@Test
+ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() {
+ long loadReportIntervalMillis = 1983;
+ InOrder inOrder = inOrder(helper, subchannelPool);
+
+ // Create a resolution list with a mixture of balancer and backend addresses
+ List<EquivalentAddressGroup> resolutionList =
+ createResolvedServerAddresses(false, true, false);
+ Attributes resolutionAttrs = Attributes.EMPTY;
+ deliverResolvedAddresses(resolutionList, resolutionAttrs);
+
+ inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
+
+ // Attempted to connect to balancer
+ assertEquals(1, fakeOobChannels.size());
+ ManagedChannel oobChannel = fakeOobChannels.poll();
+ verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
+ StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
+
+ verify(lbRequestObserver).onNext(
+ eq(LoadBalanceRequest.newBuilder().setInitialRequest(
+ InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
+ .build()));
+ lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
+ // We don't care if runSerialized() has been run.
+ inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class));
+
+ inOrder.verifyNoMoreInteractions();
+
+ assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+
+ /////////////////////////////////////////////
+ // Break the LB stream before timer expires
+ /////////////////////////////////////////////
+ Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
+ lbResponseObserver.onError(streamError.asException());
+
+ // Fall back to the backends from resolver
+ fallbackTestVerifyUseOfFallbackBackendLists(
+ inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
+
+ // A new stream is created
+ verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
+ lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ lbRequestObserver = lbRequestObservers.poll();
+ verify(lbRequestObserver).onNext(
+ eq(LoadBalanceRequest.newBuilder().setInitialRequest(
+ InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
+ .build()));
+ }
+
+ @Test
public void grpclbFallback_balancerLost() {
subtestGrpclbFallbackConnectionLost(true, false);
}