aboutsummaryrefslogtreecommitdiff
path: root/grpclb
diff options
context:
space:
mode:
authorKun Zhang <zhangkun83@users.noreply.github.com>2017-11-29 10:50:09 -0800
committerGitHub <noreply@github.com>2017-11-29 10:50:09 -0800
commit9239984a8ac4c30253326dd4c0d2c95bb71138ed (patch)
tree4d024f88810de494dc472f818d4ad942b95fbc0c /grpclb
parent3671af2c495f95700dffa24d151143dd9200f5e5 (diff)
downloadgrpc-grpc-java-9239984a8ac4c30253326dd4c0d2c95bb71138ed.tar.gz
grpclb: switch to fallback mode if all connections are lost (#3744)
Previously fallback mode can be entered only if the client has not received any server list and the fallback timeout has expired. Now the fallback timer is started when the stream to the balancer is broken AND there is no ready Subchannels. Fallback mode is activated when the timer expires. When a new server list is received from the balancer, either the fallback timer is cancelled, or fallback mode is exited. Also fixed a bug that the fallback timer should've been cancelled when GrpcState is shut down.
Diffstat (limited to 'grpclb')
-rw-r--r--grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java7
-rw-r--r--grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java138
-rw-r--r--grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java244
3 files changed, 314 insertions, 75 deletions
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
index 2ef6a1901..3b6b1eeee 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
@@ -138,12 +138,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
delegate.handleResolvedAddressGroups(newBackendServers, attributes);
break;
case GRPCLB:
- if (newLbAddressGroups.isEmpty()) {
- grpclbState.propagateError(Status.UNAVAILABLE.withDescription(
- "NameResolver returned no LB address while asking for GRPCLB"));
- } else {
- grpclbState.updateAddresses(newLbAddressGroups, newBackendServers);
- }
+ grpclbState.handleAddresses(newLbAddressGroups, newBackendServers);
break;
default:
// Do nothing
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
index 301f65985..a82d29c17 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
@@ -101,12 +101,14 @@ final class GrpclbState {
private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.of("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo");
- // Once set, never go back to null.
+ // Reset to null when timer expires or cancelled.
@Nullable
- private ScheduledFuture<?> fallbackTimer;
+ private FallbackModeTask fallbackTimer;
private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList();
- private boolean fallbackTimerExpired;
- private boolean receivedServerListFromBalancer;
+ private boolean usingFallbackBackends;
+ // True if the current balancer has returned a serverlist. Will be reset to false when lost
+ // connection to a balancer.
+ private boolean balancerWorking;
@Nullable
private ManagedChannel lbCommChannel;
@@ -141,14 +143,21 @@ final class GrpclbState {
subchannel.requestConnection();
}
subchannel.getAttributes().get(STATE_INFO).set(newState);
+ maybeStartFallbackTimer();
maybeUpdatePicker();
}
/**
- * Set the new addresses of the balancer and backends, and create connection if not yet connected.
+ * Handle new addresses of the balancer and backends from the resolver, and create connection if
+ * not yet connected.
*/
- void updateAddresses(
+ void handleAddresses(
List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers) {
+ if (newLbAddressGroups.isEmpty()) {
+ propagateError(Status.UNAVAILABLE.withDescription(
+ "NameResolver returned no LB address while asking for GRPCLB"));
+ return;
+ }
LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups);
startLbComm(newLbAddressGroup);
// Avoid creating a new RPC just because the addresses were updated, as it can cause a
@@ -158,30 +167,59 @@ final class GrpclbState {
if (lbStream == null) {
startLbRpc();
}
- // If we don't receive server list from the balancer within the timeout, we round-robin on
- // the backend list from the resolver (aka fallback), until the balancer returns a server list.
fallbackBackendList = newBackendServers;
- if (fallbackTimer == null) {
- fallbackTimer =
- timerService.schedule(new FallbackModeTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } else {
- maybeUseFallbackBackends();
+ maybeStartFallbackTimer();
+ if (usingFallbackBackends) {
+ // Populate the new fallback backends to round-robin list.
+ useFallbackBackends();
}
+ maybeUpdatePicker();
}
- private void maybeUseFallbackBackends() {
- // Only use fallback backends after fallback timer expired and before receiving server list from
- // the balancer.
- if (receivedServerListFromBalancer || !fallbackTimerExpired) {
+ /**
+ * Start the fallback timer if it's not already started and all connections are lost.
+ */
+ private void maybeStartFallbackTimer() {
+ if (fallbackTimer != null) {
return;
}
+ if (fallbackBackendList.isEmpty()) {
+ return;
+ }
+ if (balancerWorking) {
+ return;
+ }
+ if (usingFallbackBackends) {
+ return;
+ }
+ int numReadySubchannels = 0;
+ for (Subchannel subchannel : subchannels.values()) {
+ if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) {
+ numReadySubchannels++;
+ }
+ }
+ if (numReadySubchannels > 0) {
+ return;
+ }
+ logger.log(Level.FINE, "[{0}] Starting fallback timer.", new Object[] {logId});
+ fallbackTimer = new FallbackModeTask();
+ fallbackTimer.schedule();
+ }
+
+ /**
+ * Populate the round-robin lists with the fallback backends.
+ */
+ private void useFallbackBackends() {
+ usingFallbackBackends = true;
+ logger.log(Level.INFO, "[{0}] Using fallback: {1}", new Object[] {logId, fallbackBackendList});
+
List<DropEntry> newDropList = new ArrayList<DropEntry>();
List<BackendAddressGroup> newBackendAddrList = new ArrayList<BackendAddressGroup>();
for (EquivalentAddressGroup eag : fallbackBackendList) {
newDropList.add(null);
newBackendAddrList.add(new BackendAddressGroup(eag, null));
}
- updateRoundRobinLists(newDropList, newBackendAddrList, null);
+ useRoundRobinLists(newDropList, newBackendAddrList, null);
}
private void shutdownLbComm() {
@@ -231,12 +269,20 @@ final class GrpclbState {
}
}
+ private void cancelFallbackTimer() {
+ if (fallbackTimer != null) {
+ fallbackTimer.cancel();
+ fallbackTimer = null;
+ }
+ }
+
void shutdown() {
shutdownLbComm();
for (Subchannel subchannel : subchannels.values()) {
subchannel.shutdown();
}
subchannels = Collections.emptyMap();
+ cancelFallbackTimer();
}
void propagateError(Status status) {
@@ -257,7 +303,10 @@ final class GrpclbState {
return lbStream.loadRecorder;
}
- private void updateRoundRobinLists(
+ /**
+ * Populate the round-robin lists with the given values.
+ */
+ private void useRoundRobinLists(
List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList,
@Nullable GrpclbClientLoadRecorder loadRecorder) {
HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap =
@@ -301,21 +350,39 @@ final class GrpclbState {
subchannels = Collections.unmodifiableMap(newSubchannelMap);
dropList = Collections.unmodifiableList(newDropList);
backendList = Collections.unmodifiableList(newBackendList);
- maybeUpdatePicker();
}
@VisibleForTesting
class FallbackModeTask implements Runnable {
+ private ScheduledFuture<?> scheduledFuture;
+ // If the scheduledFuture is cancelled after the task has made it into the ChannelExecutor, the
+ // task will be started anyway. Use this boolean to signal that the task should not be run.
+ private boolean cancelled;
+
@Override
public void run() {
helper.runSerialized(new Runnable() {
@Override
public void run() {
- fallbackTimerExpired = true;
- maybeUseFallbackBackends();
+ if (!cancelled) {
+ checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch");
+ fallbackTimer = null;
+ useFallbackBackends();
+ maybeUpdatePicker();
+ }
}
});
}
+
+ void cancel() {
+ scheduledFuture.cancel(false);
+ cancelled = true;
+ }
+
+ void schedule() {
+ checkState(scheduledFuture == null, "FallbackModeTask already scheduled");
+ scheduledFuture = timerService.schedule(this, FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ }
}
@VisibleForTesting
@@ -443,10 +510,7 @@ final class GrpclbState {
return;
}
- receivedServerListFromBalancer = true;
- if (fallbackTimer != null) {
- fallbackTimer.cancel(false);
- }
+ balancerWorking = true;
// TODO(zhangkun83): handle delegate from initialResponse
ServerList serverList = response.getServerList();
List<DropEntry> newDropList = new ArrayList<DropEntry>();
@@ -470,16 +534,23 @@ final class GrpclbState {
newBackendAddrList.add(new BackendAddressGroup(eag, token));
}
}
- updateRoundRobinLists(newDropList, newBackendAddrList, loadRecorder);
+ // Stop using fallback backends as soon as a new server list is received from the balancer.
+ usingFallbackBackends = false;
+ cancelFallbackTimer();
+ useRoundRobinLists(newDropList, newBackendAddrList, loadRecorder);
+ maybeUpdatePicker();
}
- private void handleStreamClosed(Status status) {
+ private void handleStreamClosed(Status error) {
+ checkArgument(!error.isOk(), "unexpected OK status");
if (closed) {
return;
}
closed = true;
cleanUp();
- propagateError(status);
+ propagateError(error);
+ balancerWorking = false;
+ maybeStartFallbackTimer();
startLbRpc();
}
@@ -705,7 +776,7 @@ final class GrpclbState {
@VisibleForTesting
static final class ErrorEntry implements RoundRobinEntry {
- private final PickResult result;
+ final PickResult result;
ErrorEntry(Status status) {
result = PickResult.withError(status);
@@ -728,6 +799,13 @@ final class GrpclbState {
}
return Objects.equal(result, ((ErrorEntry) other).result);
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("result", result)
+ .toString();
+ }
}
@VisibleForTesting
diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
index fc3beab67..01958748e 100644
--- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
+++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Durations;
@@ -119,6 +120,7 @@ public class GrpclbLoadBalancerTest {
@Mock
private Helper helper;
+ private SubchannelPicker currentPicker;
private LoadBalancerGrpc.LoadBalancerImplBase mockLbService;
@Captor
private ArgumentCaptor<StreamObserver<LoadBalanceResponse>> lbResponseObserverCaptor;
@@ -220,6 +222,14 @@ public class GrpclbLoadBalancerTest {
return null;
}
}).when(helper).runSerialized(any(Runnable.class));
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ currentPicker = (SubchannelPicker) invocation.getArguments()[1];
+ return null;
+ }
+ }).when(helper).updateBalancingState(
+ any(ConnectivityState.class), any(SubchannelPicker.class));
when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY);
when(timerServicePool.getObject()).thenReturn(fakeClock.getScheduledExecutorService());
balancer = new GrpclbLoadBalancer(
@@ -247,6 +257,8 @@ public class GrpclbLoadBalancerTest {
for (Subchannel subchannel: subchannelTracker) {
verify(subchannel).shutdown();
}
+ // No timer should linger after shutdown
+ assertEquals(0, fakeClock.numPendingTasks());
} finally {
if (fakeLbServer != null) {
fakeLbServer.shutdownNow();
@@ -358,6 +370,10 @@ public class GrpclbLoadBalancerTest {
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
+
+ // No backend address from resolver. Fallback timer is not started.
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
@@ -1018,8 +1034,8 @@ public class GrpclbLoadBalancerTest {
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
- // Timer for fallback mode is registered
- assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+ // No backend address from resolver. Fallback timer is not started.
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
@@ -1034,8 +1050,6 @@ public class GrpclbLoadBalancerTest {
eq(new EquivalentAddressGroup(backends1.get(0).addr)), any(Attributes.class));
inOrder.verify(helper).createSubchannel(
eq(new EquivalentAddressGroup(backends1.get(1).addr)), any(Attributes.class));
- // Timer for fallback mode is cancelled as soon as the balancer returns a server list
- assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
@@ -1180,11 +1194,22 @@ public class GrpclbLoadBalancerTest {
verify(lbRequestObserver, never()).onError(any(Throwable.class));
// Load reporting was not requested, thus never scheduled
- assertEquals(0, fakeClock.numPendingTasks());
+ assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
+ }
+
+ @Test
+ public void grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires() {
+ subtestGrpclbFallbackInitialTimeout(false);
}
@Test
- public void grpclbFallbackToBackendsFromResolver() {
+ public void grpclbFallback_initialTimeout_timerExpires() {
+ subtestGrpclbFallbackInitialTimeout(true);
+ }
+
+ // Fallback within the period of the initial timeout, where the server list is not received from
+ // the balancer.
+ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
long loadReportIntervalMillis = 1983;
InOrder helperInOrder = inOrder(helper);
@@ -1211,29 +1236,29 @@ public class GrpclbLoadBalancerTest {
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
- // Receiving the initial response won't reset the fallback timer. Only reciving the server list
- // does.
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
// We don't care if runSerialized() has been run.
helperInOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class));
helperInOrder.verifyNoMoreInteractions();
- ////////////////////////////
- // Fallback timer expires
- ////////////////////////////
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
- fakeClock.forwardTime(1, TimeUnit.MILLISECONDS);
- assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
-
- // Fall back to the backends from resolver
- fallbackTestVerifyUseOfFallbackBackendLists(
- helperInOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
- assertNull(balancer.getDelegate());
- assertFalse(oobChannel.isShutdown());
- verify(lbRequestObserver, never()).onCompleted();
+ //////////////////////////////////
+ // Fallback timer expires (or not)
+ //////////////////////////////////
+ if (timerExpires) {
+ fakeClock.forwardTime(1, TimeUnit.MILLISECONDS);
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+ // Fall back to the backends from resolver
+ fallbackTestVerifyUseOfFallbackBackendLists(
+ helperInOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
+
+ assertNull(balancer.getDelegate());
+ assertFalse(oobChannel.isShutdown());
+ verify(lbRequestObserver, never()).onCompleted();
+ }
////////////////////////////////////////////////////////
// Name resolver sends new list without any backend addr
@@ -1250,9 +1275,11 @@ public class GrpclbLoadBalancerTest {
resolutionList.get(0).getAddresses().get(0),
resolutionList.get(1).getAddresses().get(0)))));
- // Still in fallback logic, except that the backend list is empty
- fallbackTestVerifyUseOfFallbackBackendLists(
- helperInOrder, helper, Collections.<EquivalentAddressGroup>emptyList());
+ if (timerExpires) {
+ // Still in fallback logic, except that the backend list is empty
+ fallbackTestVerifyUseOfFallbackBackendLists(
+ helperInOrder, helper, Collections.<EquivalentAddressGroup>emptyList());
+ }
//////////////////////////////////////////////////
// Name resolver sends new list with backend addrs
@@ -1266,18 +1293,32 @@ public class GrpclbLoadBalancerTest {
same(oobChannel),
addrsEq(resolutionList.get(0)));
- // New backend addresses are used for fallback
- fallbackTestVerifyUseOfFallbackBackendLists(
- helperInOrder, helper, Arrays.asList(resolutionList.get(1), resolutionList.get(2)));
+ if (timerExpires) {
+ // New backend addresses are used for fallback
+ fallbackTestVerifyUseOfFallbackBackendLists(
+ helperInOrder, helper, Arrays.asList(resolutionList.get(1), resolutionList.get(2)));
+ }
///////////////////////
// Break the LB stream
///////////////////////
- lbResponseObserver.onError(Status.UNAVAILABLE.asException());
+ Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
+ lbResponseObserver.onError(streamError.asException());
- // The error will NOT propagate to picker because fallback list is in use.
- helperInOrder.verify(helper, never())
- .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
+ if (timerExpires) {
+ // The error will NOT propagate to picker because fallback list is in use.
+ helperInOrder.verify(helper, never())
+ .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
+ } else {
+ // Not in fallback mode. The error will be propagated.
+ verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
+ RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
+ assertThat(picker.dropList).isEmpty();
+ ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList);
+ Status status = errorEntry.result.getStatus();
+ assertThat(status.getCode()).isEqualTo(streamError.getCode());
+ assertThat(status.getDescription()).contains(streamError.getDescription());
+ }
// A new stream is created
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
@@ -1298,7 +1339,7 @@ public class GrpclbLoadBalancerTest {
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(serverList));
- // Fallback mode ends
+ // Balancer-provided server list now in effect
fallbackTestVerifyUseOfBalancerBackendLists(helperInOrder, helper, serverList);
///////////////////////////////////////////////////////////////
@@ -1311,16 +1352,141 @@ public class GrpclbLoadBalancerTest {
helperInOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
- // Fallback mode is one-shot only.
+ // No fallback timeout timer scheduled.
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+ }
+
+ @Test
+ public void grpclbFallback_balancerLost_timerExpires() {
+ subtestGrpclbFallbackConnectionLost(true, false, true);
+ }
+
+ @Test
+ public void grpclbFallback_subchannelsLost_timerExpires() {
+ subtestGrpclbFallbackConnectionLost(false, true, true);
+ }
+
+ @Test
+ public void grpclbFallback_allLost_timerExpires() {
+ subtestGrpclbFallbackConnectionLost(true, true, true);
+ }
+
+ @Test
+ public void grpclbFallback_allLost_ResumeBeforeTimerExpires() {
+ subtestGrpclbFallbackConnectionLost(true, true, false);
+ }
+
+ // Fallback outside of the initial timeout, where all connections are lost.
+ private void subtestGrpclbFallbackConnectionLost(
+ boolean balancerBroken, boolean allSubchannelsBroken, boolean timerExpires) {
+ long loadReportIntervalMillis = 1983;
+ InOrder inOrder = inOrder(helper, mockLbService);
+
+ // Create a resolution list with a mixture of balancer and backend addresses
+ List<EquivalentAddressGroup> resolutionList =
+ createResolvedServerAddresses(false, true, false);
+ Attributes resolutionAttrs = Attributes.newBuilder()
+ .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
+ deliverResolvedAddresses(resolutionList, resolutionAttrs);
+
+ assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
+ inOrder.verify(helper).createOobChannel(
+ addrsEq(resolutionList.get(1)), eq(lbAuthority(0)));
+
+ // Attempted to connect to balancer
+ assertEquals(1, fakeOobChannels.size());
+ ManagedChannel oobChannel = fakeOobChannels.poll();
+ inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
+ StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
+
+ verify(lbRequestObserver).onNext(
+ eq(LoadBalanceRequest.newBuilder().setInitialRequest(
+ InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
+ .build()));
+ lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
+ // We don't care if runSerialized() has been run.
+ inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class));
+ inOrder.verifyNoMoreInteractions();
+
+ // Balancer returns a server list
+ List<ServerEntry> serverList = Arrays.asList(
+ new ServerEntry("127.0.0.1", 2000, "token0001"),
+ new ServerEntry("127.0.0.1", 2010, "token0002"));
+ lbResponseObserver.onNext(buildInitialResponse());
+ lbResponseObserver.onNext(buildLbResponse(serverList));
+
+ // No fallback timer scheduled
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+
+ List<Subchannel> subchannels =
+ fallbackTestVerifyUseOfBalancerBackendLists(inOrder, helper, serverList);
+
+ // Break connections
+ if (balancerBroken) {
+ lbResponseObserver.onError(Status.UNAVAILABLE.asException());
+ // A new stream to LB is created
+ inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
+ lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ lbRequestObserver = lbRequestObservers.poll();
+ }
+ if (allSubchannelsBroken) {
+ for (Subchannel subchannel : subchannels) {
+ // A READY subchannel transits to IDLE when receiving a go-away
+ deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
+ }
+ }
+
+ if (balancerBroken && allSubchannelsBroken) {
+ // Fallback timer is scheduled if all connections are lost.
+ assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+ if (timerExpires) {
+ fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+
+ // Going into fallback
+ subchannels = fallbackTestVerifyUseOfFallbackBackendLists(
+ inOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
+
+ // When in fallback mode, fallback timer should not be scheduled when all backend
+ // connections are lost
+ for (Subchannel subchannel : subchannels) {
+ deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
+ }
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+ } else {
+ fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
+ }
+
+ // Exit fallback mode or cancel fallback timer when receiving a new server list from balancer
+ List<ServerEntry> serverList2 = Arrays.asList(
+ new ServerEntry("127.0.0.1", 2001, "token0003"),
+ new ServerEntry("127.0.0.1", 2011, "token0004"));
+ lbResponseObserver.onNext(buildInitialResponse());
+ lbResponseObserver.onNext(buildLbResponse(serverList2));
+
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+ if (timerExpires) {
+ fallbackTestVerifyUseOfBalancerBackendLists(inOrder, helper, serverList2);
+ }
+ } else {
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+ }
+
+ if (!(balancerBroken && allSubchannelsBroken && timerExpires)) {
+ verify(helper, never()).createSubchannel(eq(resolutionList.get(0)), any(Attributes.class));
+ verify(helper, never()).createSubchannel(eq(resolutionList.get(2)), any(Attributes.class));
+ }
}
- private void fallbackTestVerifyUseOfFallbackBackendLists(
+ private List<Subchannel> fallbackTestVerifyUseOfFallbackBackendLists(
InOrder inOrder, Helper helper, List<EquivalentAddressGroup> addrs) {
- fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, null);
+ return fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, null);
}
- private void fallbackTestVerifyUseOfBalancerBackendLists(
+ private List<Subchannel> fallbackTestVerifyUseOfBalancerBackendLists(
InOrder inOrder, Helper helper, List<ServerEntry> servers) {
ArrayList<EquivalentAddressGroup> addrs = new ArrayList<EquivalentAddressGroup>();
ArrayList<String> tokens = new ArrayList<String>();
@@ -1328,10 +1494,10 @@ public class GrpclbLoadBalancerTest {
addrs.add(new EquivalentAddressGroup(server.addr));
tokens.add(server.token);
}
- fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, tokens);
+ return fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, tokens);
}
- private void fallbackTestVerifyUseOfBackendLists(
+ private List<Subchannel> fallbackTestVerifyUseOfBackendLists(
InOrder inOrder, Helper helper, List<EquivalentAddressGroup> addrs,
@Nullable List<String> tokens) {
if (tokens != null) {
@@ -1340,8 +1506,7 @@ public class GrpclbLoadBalancerTest {
for (EquivalentAddressGroup addr : addrs) {
inOrder.verify(helper).createSubchannel(addrsEq(addr), any(Attributes.class));
}
- inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
- RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
+ RoundRobinPicker picker = (RoundRobinPicker) currentPicker;
assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null));
assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY);
assertEquals(addrs.size(), mockSubchannels.size());
@@ -1374,6 +1539,7 @@ public class GrpclbLoadBalancerTest {
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
}
+ return subchannels;
}
@Test