aboutsummaryrefslogtreecommitdiff
path: root/grpclb
diff options
context:
space:
mode:
authorKun Zhang <zhangkun83@users.noreply.github.com>2017-09-12 12:25:08 -0700
committerGitHub <noreply@github.com>2017-09-12 12:25:08 -0700
commitdf92533524ab6d02d1b90370197518b0f2cfde57 (patch)
tree61e262374bfb04a146c30da9f3d9ede31d2c5ead /grpclb
parentec600feb87b5da620cda2158bcbcd3c582359878 (diff)
downloadgrpc-grpc-java-df92533524ab6d02d1b90370197518b0f2cfde57.tar.gz
grpclb: fallback to backend addresses (#3439)
The GRPCLB client will use the backend addresses from resolver if it has not received any server list from any balancer after a certain timeout (10s).
Diffstat (limited to 'grpclb')
-rw-r--r--grpclb/src/main/java/io/grpc/grpclb/BackendAddressGroup.java42
-rw-r--r--grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java38
-rw-r--r--grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java247
-rw-r--r--grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java232
4 files changed, 462 insertions, 97 deletions
diff --git a/grpclb/src/main/java/io/grpc/grpclb/BackendAddressGroup.java b/grpclb/src/main/java/io/grpc/grpclb/BackendAddressGroup.java
new file mode 100644
index 000000000..8e134b0ed
--- /dev/null
+++ b/grpclb/src/main/java/io/grpc/grpclb/BackendAddressGroup.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2017, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.grpclb;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.grpc.EquivalentAddressGroup;
+import javax.annotation.Nullable;
+
+final class BackendAddressGroup {
+ private final EquivalentAddressGroup addresses;
+ @Nullable
+ private final String token;
+
+ BackendAddressGroup(EquivalentAddressGroup addresses, @Nullable String token) {
+ this.addresses = checkNotNull(addresses, "addresses");
+ this.token = token;
+ }
+
+ EquivalentAddressGroup getAddresses() {
+ return addresses;
+ }
+
+ @Nullable
+ String getToken() {
+ return token;
+ }
+}
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
index 0cb851163..2ef6a1901 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
@@ -28,8 +28,8 @@ import io.grpc.grpclb.GrpclbConstants.LbPolicy;
import io.grpc.internal.LogId;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.WithLogId;
-import java.net.SocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
@@ -112,6 +112,9 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
}
}
+ newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups);
+ newBackendServers = Collections.unmodifiableList(newBackendServers);
+
if (!newLbAddressGroups.isEmpty()) {
if (newLbPolicy != LbPolicy.GRPCLB) {
newLbPolicy = LbPolicy.GRPCLB;
@@ -139,7 +142,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
grpclbState.propagateError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no LB address while asking for GRPCLB"));
} else {
- grpclbState.setLbAddress(flattenLbAddressGroups(newLbAddressGroups));
+ grpclbState.updateAddresses(newLbAddressGroups, newBackendServers);
}
break;
default:
@@ -211,35 +214,4 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
LbPolicy getLbPolicy() {
return lbPolicy;
}
-
- private LbAddressGroup flattenLbAddressGroups(List<LbAddressGroup> groupList) {
- assert !groupList.isEmpty();
- List<EquivalentAddressGroup> eags = new ArrayList<EquivalentAddressGroup>(groupList.size());
- String authority = groupList.get(0).getAuthority();
- for (LbAddressGroup group : groupList) {
- if (!authority.equals(group.getAuthority())) {
- // TODO(ejona): Allow different authorities for different addresses. Requires support from
- // Helper.
- logger.log(Level.WARNING,
- "[{0}] Multiple authorities found for LB. "
- + "Skipping addresses for {0} in preference to {1}",
- new Object[] {logId, group.getAuthority(), authority});
- } else {
- eags.add(group.getAddresses());
- }
- }
- return new LbAddressGroup(flattenEquivalentAddressGroup(eags), authority);
- }
-
- /**
- * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object.
- */
- private static EquivalentAddressGroup flattenEquivalentAddressGroup(
- List<EquivalentAddressGroup> groupList) {
- List<SocketAddress> addrs = new ArrayList<SocketAddress>();
- for (EquivalentAddressGroup group : groupList) {
- addrs.addAll(group.getAddresses());
- }
- return new EquivalentAddressGroup(addrs);
- }
}
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
index 39c4368a6..203f055e4 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
@@ -46,6 +46,7 @@ import io.grpc.internal.LogId;
import io.grpc.stub.StreamObserver;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -68,12 +69,12 @@ import javax.annotation.concurrent.NotThreadSafe;
* GrpclbLoadBalancer switches to GRPCLB mode. Closed and discarded when GrpclbLoadBalancer
* switches away from GRPCLB mode.
*/
-// TODO(zhangkun83): round-robin on the backend list from the resolver if we don't get a server list
-// within a configurable timeout.
@NotThreadSafe
final class GrpclbState {
private static final Logger logger = Logger.getLogger(GrpclbState.class.getName());
+ static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
+
@VisibleForTesting
static final PickResult DROP_PICK_RESULT =
PickResult.withError(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer"));
@@ -84,6 +85,11 @@ final class GrpclbState {
public PickResult picked(Metadata headers) {
return PickResult.withNoResult();
}
+
+ @Override
+ public String toString() {
+ return "BUFFER_ENTRY";
+ }
};
private final LogId logId;
@@ -95,6 +101,13 @@ final class GrpclbState {
private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.of("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo");
+ // Once set, never go back to null.
+ @Nullable
+ private ScheduledFuture<?> fallbackTimer;
+ private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList();
+ private boolean fallbackTimerExpired;
+ private boolean receivedServerListFromBalancer;
+
@Nullable
private ManagedChannel lbCommChannel;
@@ -132,9 +145,11 @@ final class GrpclbState {
}
/**
- * Set the address of the balancer, and create connection if not yet connected.
+ * Set the new addresses of the balancer and backends, and create connection if not yet connected.
*/
- void setLbAddress(LbAddressGroup newLbAddressGroup) {
+ void updateAddresses(
+ List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers) {
+ LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups);
startLbComm(newLbAddressGroup);
// Avoid creating a new RPC just because the addresses were updated, as it can cause a
// stampeding herd. The current RPC may be on a connection to an address not present in
@@ -143,6 +158,30 @@ final class GrpclbState {
if (lbStream == null) {
startLbRpc();
}
+ // If we don't receive server list from the balancer within the timeout, we round-robin on
+ // the backend list from the resolver (aka fallback), until the balancer returns a server list.
+ fallbackBackendList = newBackendServers;
+ if (fallbackTimer == null) {
+ fallbackTimer =
+ timerService.schedule(new FallbackModeTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } else {
+ maybeUseFallbackBackends();
+ }
+ }
+
+ private void maybeUseFallbackBackends() {
+ // Only use fallback backends after fallback timer expired and before receiving server list from
+ // the balancer.
+ if (receivedServerListFromBalancer || !fallbackTimerExpired) {
+ return;
+ }
+ List<DropEntry> newDropList = new ArrayList<DropEntry>();
+ List<BackendAddressGroup> newBackendAddrList = new ArrayList<BackendAddressGroup>();
+ for (EquivalentAddressGroup eag : fallbackBackendList) {
+ newDropList.add(null);
+ newBackendAddrList.add(new BackendAddressGroup(eag, null));
+ }
+ updateRoundRobinLists(newDropList, newBackendAddrList, null);
}
private void shutdownLbComm() {
@@ -179,6 +218,7 @@ final class GrpclbState {
checkState(lbStream == null, "previous lbStream has not been cleared yet");
LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel);
lbStream = new LbStream(stub);
+ lbStream.start();
LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()
.setInitialRequest(InitialLoadBalanceRequest.newBuilder()
@@ -217,33 +257,106 @@ final class GrpclbState {
return lbStream.loadRecorder;
}
+ private void updateRoundRobinLists(
+ List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList,
+ @Nullable GrpclbClientLoadRecorder loadRecorder) {
+ HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap =
+ new HashMap<EquivalentAddressGroup, Subchannel>();
+ List<BackendEntry> newBackendList = new ArrayList<BackendEntry>();
+
+ for (BackendAddressGroup backendAddr : newBackendAddrList) {
+ EquivalentAddressGroup eag = backendAddr.getAddresses();
+ Subchannel subchannel = newSubchannelMap.get(eag);
+ if (subchannel == null) {
+ subchannel = subchannels.get(eag);
+ if (subchannel == null) {
+ Attributes subchannelAttrs = Attributes.newBuilder()
+ .set(STATE_INFO,
+ new AtomicReference<ConnectivityStateInfo>(
+ ConnectivityStateInfo.forNonError(IDLE)))
+ .build();
+ subchannel = helper.createSubchannel(eag, subchannelAttrs);
+ subchannel.requestConnection();
+ }
+ newSubchannelMap.put(eag, subchannel);
+ }
+ BackendEntry entry;
+ // Only picks with tokens are reported to LoadRecorder
+ if (backendAddr.getToken() == null) {
+ entry = new BackendEntry(subchannel);
+ } else {
+ entry = new BackendEntry(subchannel, loadRecorder, backendAddr.getToken());
+ }
+ newBackendList.add(entry);
+ }
+
+ // Close Subchannels whose addresses have been delisted
+ for (Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) {
+ EquivalentAddressGroup eag = entry.getKey();
+ if (!newSubchannelMap.containsKey(eag)) {
+ entry.getValue().shutdown();
+ }
+ }
+
+ subchannels = Collections.unmodifiableMap(newSubchannelMap);
+ dropList = Collections.unmodifiableList(newDropList);
+ backendList = Collections.unmodifiableList(newBackendList);
+ maybeUpdatePicker();
+ }
+
+ @VisibleForTesting
+ class FallbackModeTask implements Runnable {
+ @Override
+ public void run() {
+ helper.runSerialized(new Runnable() {
+ @Override
+ public void run() {
+ fallbackTimerExpired = true;
+ maybeUseFallbackBackends();
+ }
+ });
+ }
+ }
+
+ @VisibleForTesting
+ class LoadReportingTask implements Runnable {
+ private final LbStream stream;
+
+ LoadReportingTask(LbStream stream) {
+ this.stream = stream;
+ }
+
+ @Override
+ public void run() {
+ helper.runSerialized(new Runnable() {
+ @Override
+ public void run() {
+ stream.loadReportFuture = null;
+ stream.sendLoadReport();
+ }
+ });
+ }
+ }
+
private class LbStream implements StreamObserver<LoadBalanceResponse> {
- final StreamObserver<LoadBalanceRequest> lbRequestWriter;
final GrpclbClientLoadRecorder loadRecorder;
-
- final Runnable loadReportRunnable = new Runnable() {
- @Override
- public void run() {
- helper.runSerialized(new Runnable() {
- @Override
- public void run() {
- loadReportTask = null;
- sendLoadReport();
- }
- });
- }
- };
+ final LoadBalancerGrpc.LoadBalancerStub stub;
+ StreamObserver<LoadBalanceRequest> lbRequestWriter;
// These fields are only accessed from helper.runSerialized()
boolean initialResponseReceived;
boolean closed;
long loadReportIntervalMillis = -1;
- ScheduledFuture<?> loadReportTask;
+ ScheduledFuture<?> loadReportFuture;
LbStream(LoadBalancerGrpc.LoadBalancerStub stub) {
+ this.stub = checkNotNull(stub, "stub");
// Stats data only valid for current LbStream. We do not carry over data from previous
// stream.
loadRecorder = new GrpclbClientLoadRecorder(time);
+ }
+
+ void start() {
lbRequestWriter = stub.withWaitForReady().balanceLoad(this);
}
@@ -294,8 +407,8 @@ final class GrpclbState {
private void scheduleNextLoadReport() {
if (loadReportIntervalMillis > 0) {
- loadReportTask = timerService.schedule(
- loadReportRunnable, loadReportIntervalMillis, TimeUnit.MILLISECONDS);
+ loadReportFuture = timerService.schedule(
+ new LoadReportingTask(this), loadReportIntervalMillis, TimeUnit.MILLISECONDS);
}
}
@@ -330,12 +443,14 @@ final class GrpclbState {
return;
}
+ receivedServerListFromBalancer = true;
+ if (fallbackTimer != null) {
+ fallbackTimer.cancel(false);
+ }
// TODO(zhangkun83): handle delegate from initialResponse
ServerList serverList = response.getServerList();
- HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap =
- new HashMap<EquivalentAddressGroup, Subchannel>();
List<DropEntry> newDropList = new ArrayList<DropEntry>();
- List<BackendEntry> newBackendList = new ArrayList<BackendEntry>();
+ List<BackendAddressGroup> newBackendAddrList = new ArrayList<BackendAddressGroup>();
// Construct the new collections. Create new Subchannels when necessary.
for (Server server : serverList.getServersList()) {
String token = server.getLoadBalanceToken();
@@ -352,35 +467,10 @@ final class GrpclbState {
continue;
}
EquivalentAddressGroup eag = new EquivalentAddressGroup(address);
- Subchannel subchannel = newSubchannelMap.get(eag);
- if (subchannel == null) {
- subchannel = subchannels.get(eag);
- if (subchannel == null) {
- Attributes subchannelAttrs = Attributes.newBuilder()
- .set(STATE_INFO,
- new AtomicReference<ConnectivityStateInfo>(
- ConnectivityStateInfo.forNonError(IDLE)))
- .build();
- subchannel = helper.createSubchannel(eag, subchannelAttrs);
- subchannel.requestConnection();
- }
- newSubchannelMap.put(eag, subchannel);
- }
- newBackendList.add(new BackendEntry(subchannel, loadRecorder, token));
- }
- }
- // Close Subchannels whose addresses have been delisted
- for (Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) {
- EquivalentAddressGroup eag = entry.getKey();
- if (!newSubchannelMap.containsKey(eag)) {
- entry.getValue().shutdown();
+ newBackendAddrList.add(new BackendAddressGroup(eag, token));
}
}
-
- subchannels = Collections.unmodifiableMap(newSubchannelMap);
- dropList = Collections.unmodifiableList(newDropList);
- backendList = Collections.unmodifiableList(newBackendList);
- maybeUpdatePicker();
+ updateRoundRobinLists(newDropList, newBackendAddrList, loadRecorder);
}
private void handleStreamClosed(Status status) {
@@ -411,9 +501,9 @@ final class GrpclbState {
}
private void cleanUp() {
- if (loadReportTask != null) {
- loadReportTask.cancel(false);
- loadReportTask = null;
+ if (loadReportFuture != null) {
+ loadReportFuture.cancel(false);
+ loadReportFuture = null;
}
if (lbStream == this) {
lbStream = null;
@@ -479,6 +569,37 @@ final class GrpclbState {
helper.updateBalancingState(state, picker);
}
+ private LbAddressGroup flattenLbAddressGroups(List<LbAddressGroup> groupList) {
+ assert !groupList.isEmpty();
+ List<EquivalentAddressGroup> eags = new ArrayList<EquivalentAddressGroup>(groupList.size());
+ String authority = groupList.get(0).getAuthority();
+ for (LbAddressGroup group : groupList) {
+ if (!authority.equals(group.getAuthority())) {
+ // TODO(ejona): Allow different authorities for different addresses. Requires support from
+ // Helper.
+ logger.log(Level.WARNING,
+ "[{0}] Multiple authorities found for LB. "
+ + "Skipping addresses for {0} in preference to {1}",
+ new Object[] {logId, group.getAuthority(), authority});
+ } else {
+ eags.add(group.getAddresses());
+ }
+ }
+ return new LbAddressGroup(flattenEquivalentAddressGroup(eags), authority);
+ }
+
+ /**
+ * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object.
+ */
+ private static EquivalentAddressGroup flattenEquivalentAddressGroup(
+ List<EquivalentAddressGroup> groupList) {
+ List<SocketAddress> addrs = new ArrayList<SocketAddress>();
+ for (EquivalentAddressGroup group : groupList) {
+ addrs.addAll(group.getAddresses());
+ }
+ return new EquivalentAddressGroup(addrs);
+ }
+
@VisibleForTesting
static final class DropEntry {
private final GrpclbClientLoadRecorder loadRecorder;
@@ -525,19 +646,35 @@ final class GrpclbState {
static final class BackendEntry implements RoundRobinEntry {
@VisibleForTesting
final PickResult result;
+ @Nullable
private final GrpclbClientLoadRecorder loadRecorder;
+ @Nullable
private final String token;
+ /**
+ * Creates a BackendEntry whose usage will be reported to load recorder.
+ */
BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) {
this.result = PickResult.withSubchannel(subchannel, loadRecorder);
this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder");
this.token = checkNotNull(token, "token");
}
+ /**
+ * Creates a BackendEntry whose usage will not be reported.
+ */
+ BackendEntry(Subchannel subchannel) {
+ this.result = PickResult.withSubchannel(subchannel);
+ this.loadRecorder = null;
+ this.token = null;
+ }
+
@Override
public PickResult picked(Metadata headers) {
headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
- headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
+ if (token != null) {
+ headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
+ }
return result;
}
diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
index c57d38f3a..23c2efb51 100644
--- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
+++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
@@ -82,6 +82,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -101,6 +102,20 @@ public class GrpclbLoadBalancerTest {
private static final Attributes.Key<String> RESOLUTION_ATTR =
Attributes.Key.of("resolution-attr");
private static final String SERVICE_AUTHORITY = "api.google.com";
+ private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER =
+ new FakeClock.TaskFilter() {
+ @Override
+ public boolean shouldAccept(Runnable command) {
+ return command instanceof GrpclbState.LoadReportingTask;
+ }
+ };
+ private static final FakeClock.TaskFilter FALLBACK_MODE_TASK_FILTER =
+ new FakeClock.TaskFilter() {
+ @Override
+ public boolean shouldAccept(Runnable command) {
+ return command instanceof GrpclbState.FallbackModeTask;
+ }
+ };
@Mock
private Helper helper;
@@ -356,11 +371,11 @@ public class GrpclbLoadBalancerTest {
.build()));
// Simulate receiving LB response
- assertEquals(0, fakeClock.numPendingTasks());
+ assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
// Load reporting task is scheduled
- assertEquals(1, fakeClock.numPendingTasks());
+ assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
assertEquals(0, fakeClock.runDueTasks());
List<ServerEntry> backends = Arrays.asList(
@@ -569,18 +584,19 @@ public class GrpclbLoadBalancerTest {
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
// Simulate LB initial response
- assertEquals(0, fakeClock.numPendingTasks());
+ assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
lbResponseObserver.onNext(buildInitialResponse(1983));
// Load reporting task is scheduled
- assertEquals(1, fakeClock.numPendingTasks());
+ assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next();
assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
// Simulate an abundant LB initial response, with a different report interval
lbResponseObserver.onNext(buildInitialResponse(9097));
// It doesn't affect load-reporting at all
- assertThat(fakeClock.getPendingTasks()).containsExactly(scheduledTask);
+ assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER))
+ .containsExactly(scheduledTask);
assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
}
@@ -607,11 +623,11 @@ public class GrpclbLoadBalancerTest {
.build()));
// Simulate receiving LB response
- assertEquals(0, fakeClock.numPendingTasks());
+ assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
lbResponseObserver.onNext(buildInitialResponse(1983));
// Load reporting task is scheduled
- assertEquals(1, fakeClock.numPendingTasks());
+ assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next();
assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
@@ -619,14 +635,14 @@ public class GrpclbLoadBalancerTest {
lbResponseObserver.onCompleted();
// Reporting task cancelled
- assertEquals(0, fakeClock.numPendingTasks());
+ assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
// Simulate a race condition where the task has just started when its cancelled
scheduledTask.command.run();
// No report sent. No new task scheduled
inOrder.verify(lbRequestObserver, never()).onNext(any(LoadBalanceRequest.class));
- assertEquals(0, fakeClock.numPendingTasks());
+ assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
}
private void assertNextReport(
@@ -1001,6 +1017,9 @@ public class GrpclbLoadBalancerTest {
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
+ // Timer for fallback mode is registered
+ assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
@@ -1014,6 +1033,8 @@ public class GrpclbLoadBalancerTest {
eq(new EquivalentAddressGroup(backends1.get(0).addr)), any(Attributes.class));
inOrder.verify(helper).createSubchannel(
eq(new EquivalentAddressGroup(backends1.get(1).addr)), any(Attributes.class));
+ // Timer for fallback mode is cancelled as soon as the balancer returns a server list
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll();
@@ -1162,6 +1183,199 @@ public class GrpclbLoadBalancerTest {
}
@Test
+ public void grpclbFallbackToBackendsFromResolver() {
+ long loadReportIntervalMillis = 1983;
+ InOrder helperInOrder = inOrder(helper);
+
+ // Create a resolution list with a mixture of balancer and backend addresses
+ List<EquivalentAddressGroup> resolutionList =
+ createResolvedServerAddresses(false, true, false);
+ Attributes resolutionAttrs = Attributes.newBuilder()
+ .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
+ deliverResolvedAddresses(resolutionList, resolutionAttrs);
+
+ assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
+ helperInOrder.verify(helper).createOobChannel(
+ addrsEq(resolutionList.get(1)), eq(lbAuthority(0)));
+
+ // Attempted to connect to balancer
+ assertEquals(1, fakeOobChannels.size());
+ ManagedChannel oobChannel = fakeOobChannels.poll();
+ verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
+ StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
+
+ verify(lbRequestObserver).onNext(
+ eq(LoadBalanceRequest.newBuilder().setInitialRequest(
+ InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
+ .build()));
+ // Receiving the initial response won't reset the fallback timer. Only reciving the server list
+ // does.
+ lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
+ // We don't care if runSerialized() has been run.
+ helperInOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class));
+ helperInOrder.verifyNoMoreInteractions();
+
+ ////////////////////////////
+ // Fallback timer expires
+ ////////////////////////////
+ assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+ fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
+ assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+ fakeClock.forwardTime(1, TimeUnit.MILLISECONDS);
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+
+ // Fall back to the backends from resolver
+ fallbackTestVerifyUseOfFallbackBackendLists(
+ helperInOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
+
+ assertNull(balancer.getDelegate());
+ assertFalse(oobChannel.isShutdown());
+ verify(lbRequestObserver, never()).onCompleted();
+
+ ////////////////////////////////////////////////////////
+ // Name resolver sends new list without any backend addr
+ ////////////////////////////////////////////////////////
+ resolutionList = createResolvedServerAddresses(true, true);
+ deliverResolvedAddresses(resolutionList, resolutionAttrs);
+ assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
+
+ // New addresses are updated to the OobChannel
+ helperInOrder.verify(helper).updateOobChannelAddresses(
+ same(oobChannel),
+ eq(new EquivalentAddressGroup(
+ Arrays.asList(
+ resolutionList.get(0).getAddresses().get(0),
+ resolutionList.get(1).getAddresses().get(0)))));
+
+ // Still in fallback logic, except that the backend list is empty
+ fallbackTestVerifyUseOfFallbackBackendLists(
+ helperInOrder, helper, Collections.<EquivalentAddressGroup>emptyList());
+
+ //////////////////////////////////////////////////
+ // Name resolver sends new list with backend addrs
+ //////////////////////////////////////////////////
+ resolutionList = createResolvedServerAddresses(true, false, false);
+ deliverResolvedAddresses(resolutionList, resolutionAttrs);
+ assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
+
+ // New LB address is updated to the OobChannel
+ helperInOrder.verify(helper).updateOobChannelAddresses(
+ same(oobChannel),
+ addrsEq(resolutionList.get(0)));
+
+ // New backend addresses are used for fallback
+ fallbackTestVerifyUseOfFallbackBackendLists(
+ helperInOrder, helper, Arrays.asList(resolutionList.get(1), resolutionList.get(2)));
+
+ ///////////////////////
+ // Break the LB stream
+ ///////////////////////
+ lbResponseObserver.onError(Status.UNAVAILABLE.asException());
+
+ // The error will NOT propagate to picker because fallback list is in use.
+ helperInOrder.verify(helper, never())
+ .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
+
+ // A new stream is created
+ verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
+ lbResponseObserver = lbResponseObserverCaptor.getValue();
+ assertEquals(1, lbRequestObservers.size());
+ lbRequestObserver = lbRequestObservers.poll();
+ verify(lbRequestObserver).onNext(
+ eq(LoadBalanceRequest.newBuilder().setInitialRequest(
+ InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
+ .build()));
+
+ /////////////////////////////////
+ // Balancer returns a server list
+ /////////////////////////////////
+ List<ServerEntry> serverList = Arrays.asList(
+ new ServerEntry("127.0.0.1", 2000, "token0001"),
+ new ServerEntry("127.0.0.1", 2010, "token0002"));
+ lbResponseObserver.onNext(buildInitialResponse());
+ lbResponseObserver.onNext(buildLbResponse(serverList));
+
+ // Fallback mode ends
+ fallbackTestVerifyUseOfBalancerBackendLists(helperInOrder, helper, serverList);
+
+ ///////////////////////////////////////////////////////////////
+ // New backend addresses from resolver outside of fallback mode
+ ///////////////////////////////////////////////////////////////
+ resolutionList = createResolvedServerAddresses(true, false);
+ deliverResolvedAddresses(resolutionList, resolutionAttrs);
+ assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
+ // Will not affect the round robin list at all
+ helperInOrder.verify(helper, never())
+ .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
+
+ // Fallback mode is one-shot only.
+ assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
+ }
+
+ private void fallbackTestVerifyUseOfFallbackBackendLists(
+ InOrder inOrder, Helper helper, List<EquivalentAddressGroup> addrs) {
+ fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, null);
+ }
+
+ private void fallbackTestVerifyUseOfBalancerBackendLists(
+ InOrder inOrder, Helper helper, List<ServerEntry> servers) {
+ ArrayList<EquivalentAddressGroup> addrs = new ArrayList<EquivalentAddressGroup>();
+ ArrayList<String> tokens = new ArrayList<String>();
+ for (ServerEntry server : servers) {
+ addrs.add(new EquivalentAddressGroup(server.addr));
+ tokens.add(server.token);
+ }
+ fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, tokens);
+ }
+
+ private void fallbackTestVerifyUseOfBackendLists(
+ InOrder inOrder, Helper helper, List<EquivalentAddressGroup> addrs,
+ @Nullable List<String> tokens) {
+ if (tokens != null) {
+ assertEquals(addrs.size(), tokens.size());
+ }
+ for (EquivalentAddressGroup addr : addrs) {
+ inOrder.verify(helper).createSubchannel(addrsEq(addr), any(Attributes.class));
+ }
+ inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
+ RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
+ assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null));
+ assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY);
+ assertEquals(addrs.size(), mockSubchannels.size());
+ ArrayList<Subchannel> subchannels = new ArrayList<Subchannel>(mockSubchannels);
+ mockSubchannels.clear();
+ for (Subchannel subchannel : subchannels) {
+ deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
+ }
+ inOrder.verify(helper, atLeast(0))
+ .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
+ inOrder.verify(helper, never())
+ .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
+
+ ArrayList<BackendEntry> pickList = new ArrayList<BackendEntry>();
+ for (int i = 0; i < addrs.size(); i++) {
+ Subchannel subchannel = subchannels.get(i);
+ BackendEntry backend;
+ if (tokens == null) {
+ backend = new BackendEntry(subchannel);
+ } else {
+ backend = new BackendEntry(subchannel, getLoadRecorder(), tokens.get(i));
+ }
+ pickList.add(backend);
+ deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
+ inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
+ picker = (RoundRobinPicker) pickerCaptor.getValue();
+ assertThat(picker.dropList)
+ .containsExactlyElementsIn(Collections.nCopies(addrs.size(), null));
+ assertThat(picker.pickList).containsExactlyElementsIn(pickList);
+ inOrder.verify(helper, never())
+ .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
+ }
+ }
+
+ @Test
public void grpclbMultipleAuthorities() throws Exception {
List<EquivalentAddressGroup> grpclbResolutionList = Arrays.asList(
new EquivalentAddressGroup(