aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChengyuan Zhang <chengyuanzhang@google.com>2020-04-07 11:44:36 -0700
committerGitHub <noreply@github.com>2020-04-07 11:44:36 -0700
commitd88f0f19ec128fb4ee3550f221949aee3fc9cbd9 (patch)
tree5008ef1ece40ccf4f5765706da99032ead83b31b
parent58a92b7530d4d971173711add8c30b4d471c478c (diff)
downloadgrpc-grpc-java-d88f0f19ec128fb4ee3550f221949aee3fc9cbd9.tar.gz
xds: implement LRS LB policy (#6858)
Part of xDS LB policy refactoring work. Implement the LRS LB policy for "balancing" endpoints within a certain locality.
-rw-r--r--xds/src/main/java/io/grpc/xds/EnvoyProtoData.java22
-rw-r--r--xds/src/main/java/io/grpc/xds/LocalityStore.java2
-rw-r--r--xds/src/main/java/io/grpc/xds/LrsLoadBalancer.java123
-rw-r--r--xds/src/main/java/io/grpc/xds/LrsLoadBalancerProvider.java84
-rw-r--r--xds/src/main/java/io/grpc/xds/XdsAttributes.java5
-rw-r--r--xds/src/test/java/io/grpc/xds/EnvoyProtoDataTest.java2
-rw-r--r--xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java334
-rw-r--r--xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java4
8 files changed, 560 insertions, 16 deletions
diff --git a/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java b/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java
index 65bc3a453..8afbe433b 100644
--- a/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java
+++ b/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java
@@ -51,28 +51,26 @@ final class EnvoyProtoData {
static final class Locality {
private final String region;
private final String zone;
- private final String subzone;
+ private final String subZone;
- /** Must only be used for testing. */
- @VisibleForTesting
- Locality(String region, String zone, String subzone) {
+ Locality(String region, String zone, String subZone) {
this.region = region;
this.zone = zone;
- this.subzone = subzone;
+ this.subZone = subZone;
}
static Locality fromEnvoyProtoLocality(io.envoyproxy.envoy.api.v2.core.Locality locality) {
return new Locality(
/* region = */ locality.getRegion(),
/* zone = */ locality.getZone(),
- /* subzone = */ locality.getSubZone());
+ /* subZone = */ locality.getSubZone());
}
io.envoyproxy.envoy.api.v2.core.Locality toEnvoyProtoLocality() {
return io.envoyproxy.envoy.api.v2.core.Locality.newBuilder()
.setRegion(region)
.setZone(zone)
- .setSubZone(subzone)
+ .setSubZone(subZone)
.build();
}
@@ -84,8 +82,8 @@ final class EnvoyProtoData {
return zone;
}
- String getSubzone() {
- return subzone;
+ String getSubZone() {
+ return subZone;
}
@Override
@@ -99,12 +97,12 @@ final class EnvoyProtoData {
Locality locality = (Locality) o;
return Objects.equals(region, locality.region)
&& Objects.equals(zone, locality.zone)
- && Objects.equals(subzone, locality.subzone);
+ && Objects.equals(subZone, locality.subZone);
}
@Override
public int hashCode() {
- return Objects.hash(region, zone, subzone);
+ return Objects.hash(region, zone, subZone);
}
@Override
@@ -112,7 +110,7 @@ final class EnvoyProtoData {
return MoreObjects.toStringHelper(this)
.add("region", region)
.add("zone", zone)
- .add("subzone", subzone)
+ .add("subZone", subZone)
.toString();
}
}
diff --git a/xds/src/main/java/io/grpc/xds/LocalityStore.java b/xds/src/main/java/io/grpc/xds/LocalityStore.java
index 60a24d5e7..ca54757ce 100644
--- a/xds/src/main/java/io/grpc/xds/LocalityStore.java
+++ b/xds/src/main/java/io/grpc/xds/LocalityStore.java
@@ -421,7 +421,7 @@ interface LocalityStore {
@Override
public String getAuthority() {
//FIXME: This should be a new proposed field of Locality, locality_name
- return locality.getSubzone();
+ return locality.getSubZone();
}
};
orcaReportingHelperWrapper =
diff --git a/xds/src/main/java/io/grpc/xds/LrsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/LrsLoadBalancer.java
new file mode 100644
index 000000000..55ca45127
--- /dev/null
+++ b/xds/src/main/java/io/grpc/xds/LrsLoadBalancer.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2020 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import io.grpc.ConnectivityState;
+import io.grpc.LoadBalancer;
+import io.grpc.Status;
+import io.grpc.util.ForwardingLoadBalancerHelper;
+import io.grpc.util.GracefulSwitchLoadBalancer;
+import io.grpc.xds.ClientLoadCounter.LoadRecordingSubchannelPicker;
+import io.grpc.xds.EnvoyProtoData.Locality;
+import io.grpc.xds.LrsLoadBalancerProvider.LrsConfig;
+import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
+import java.util.Objects;
+import javax.annotation.CheckForNull;
+
+/**
+ * Load balancer for lrs policy.
+ */
+final class LrsLoadBalancer extends LoadBalancer {
+ private final LoadBalancer.Helper helper;
+ @CheckForNull
+ private GracefulSwitchLoadBalancer switchingLoadBalancer;
+ private LoadStatsStore loadStatsStore;
+ private String clusterName;
+ private String edsServiceName;
+ private Locality locality;
+ private String childPolicyName;
+
+ LrsLoadBalancer(LoadBalancer.Helper helper) {
+ this.helper = checkNotNull(helper, "helper");
+ }
+
+ @Override
+ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
+ LrsConfig config = (LrsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
+ LoadStatsStore store =
+ resolvedAddresses.getAttributes().get(XdsAttributes.ATTR_CLUSTER_SERVICE_LOAD_STATS_STORE);
+ checkNotNull(config, "missing LRS lb config");
+ checkNotNull(store, "missing cluster service stats object");
+ checkAndSetUp(config, store);
+
+ if (switchingLoadBalancer == null) {
+ loadStatsStore.addLocality(config.locality);
+ final ClientLoadCounter counter = loadStatsStore.getLocalityCounter(config.locality);
+ LoadBalancer.Helper loadRecordingHelper = new ForwardingLoadBalancerHelper() {
+ @Override
+ protected Helper delegate() {
+ return helper;
+ }
+
+ @Override
+ public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
+ SubchannelPicker loadRecordingPicker =
+ new LoadRecordingSubchannelPicker(counter, newPicker);
+ super.updateBalancingState(newState, loadRecordingPicker);
+ }
+ };
+ switchingLoadBalancer = new GracefulSwitchLoadBalancer(loadRecordingHelper);
+ }
+ String updatedChildPolicyName = config.childPolicy.getProvider().getPolicyName();
+ if (!Objects.equals(childPolicyName, updatedChildPolicyName)) {
+ switchingLoadBalancer.switchTo(config.childPolicy.getProvider());
+ childPolicyName = updatedChildPolicyName;
+ }
+ ResolvedAddresses downStreamResult =
+ resolvedAddresses.toBuilder()
+ .setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
+ .build();
+ switchingLoadBalancer.handleResolvedAddresses(downStreamResult);
+ }
+
+ @Override
+ public void handleNameResolutionError(Status error) {
+ if (switchingLoadBalancer != null) {
+ switchingLoadBalancer.handleNameResolutionError(error);
+ } else {
+ helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker(error));
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (switchingLoadBalancer != null) {
+ loadStatsStore.removeLocality(locality);
+ switchingLoadBalancer.shutdown();
+ }
+ }
+
+ private void checkAndSetUp(LrsConfig config, LoadStatsStore store) {
+ checkState(
+ clusterName == null || clusterName.equals(config.clusterName),
+ "cluster name should not change");
+ checkState(
+ edsServiceName == null || edsServiceName.equals(config.edsServiceName),
+ "edsServiceName should not change");
+ checkState(locality == null || locality.equals(config.locality), "locality should not change");
+ checkState(
+ loadStatsStore == null || loadStatsStore.equals(store),
+ "loadStatsStore should not change");
+ clusterName = config.clusterName;
+ edsServiceName = config.edsServiceName;
+ locality = config.locality;
+ loadStatsStore = store;
+ }
+}
diff --git a/xds/src/main/java/io/grpc/xds/LrsLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/LrsLoadBalancerProvider.java
new file mode 100644
index 000000000..d4a663b26
--- /dev/null
+++ b/xds/src/main/java/io/grpc/xds/LrsLoadBalancerProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2020 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import io.grpc.Internal;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancerProvider;
+import io.grpc.NameResolver.ConfigOrError;
+import io.grpc.internal.ServiceConfigUtil.PolicySelection;
+import io.grpc.xds.EnvoyProtoData.Locality;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Provider for lrs load balancing policy.
+ */
+@Internal
+public final class LrsLoadBalancerProvider extends LoadBalancerProvider {
+
+ private static final String LRS_POLICY_NAME = "lrs_experimental";
+
+ @Override
+ public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
+ return new LrsLoadBalancer(helper);
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ public int getPriority() {
+ return 5;
+ }
+
+ @Override
+ public String getPolicyName() {
+ return LRS_POLICY_NAME;
+ }
+
+ @Override
+ public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawConfig) {
+ throw new UnsupportedOperationException();
+ }
+
+ static final class LrsConfig {
+ final String clusterName;
+ @Nullable
+ final String edsServiceName;
+ final String lrsServerName;
+ final Locality locality;
+ final PolicySelection childPolicy;
+
+ LrsConfig(
+ String clusterName,
+ @Nullable String edsServiceName,
+ String lrsServerName,
+ Locality locality,
+ PolicySelection childPolicy) {
+ this.clusterName = checkNotNull(clusterName, "clusterName");
+ this.edsServiceName = edsServiceName;
+ this.lrsServerName = checkNotNull(lrsServerName, "lrsServerName");
+ this.locality = checkNotNull(locality, "locality");
+ this.childPolicy = checkNotNull(childPolicy, "childPolicy");
+ }
+ }
+}
diff --git a/xds/src/main/java/io/grpc/xds/XdsAttributes.java b/xds/src/main/java/io/grpc/xds/XdsAttributes.java
index 85b8ddd3d..8276c98ad 100644
--- a/xds/src/main/java/io/grpc/xds/XdsAttributes.java
+++ b/xds/src/main/java/io/grpc/xds/XdsAttributes.java
@@ -79,5 +79,10 @@ public final class XdsAttributes {
static final Attributes.Key<ObjectPool<XdsClient>> XDS_CLIENT_POOL =
Attributes.Key.create("io.grpc.xds.XdsAttributes.xdsClientPool");
+ // TODO (chengyuanzhang): temporary solution for migrating to LRS policy. Should access
+ // stats object via XdsClient interface.
+ static final Attributes.Key<LoadStatsStore> ATTR_CLUSTER_SERVICE_LOAD_STATS_STORE =
+ Attributes.Key.create("io.grpc.xds.XdsAttributes.loadStatsStore");
+
private XdsAttributes() {}
}
diff --git a/xds/src/test/java/io/grpc/xds/EnvoyProtoDataTest.java b/xds/src/test/java/io/grpc/xds/EnvoyProtoDataTest.java
index 4bb0f991b..154bfe286 100644
--- a/xds/src/test/java/io/grpc/xds/EnvoyProtoDataTest.java
+++ b/xds/src/test/java/io/grpc/xds/EnvoyProtoDataTest.java
@@ -41,7 +41,7 @@ public class EnvoyProtoDataTest {
Locality xdsLocality = Locality.fromEnvoyProtoLocality(locality);
assertThat(xdsLocality.getRegion()).isEqualTo("test_region");
assertThat(xdsLocality.getZone()).isEqualTo("test_zone");
- assertThat(xdsLocality.getSubzone()).isEqualTo("test_subzone");
+ assertThat(xdsLocality.getSubZone()).isEqualTo("test_subzone");
io.envoyproxy.envoy.api.v2.core.Locality convertedLocality = xdsLocality.toEnvoyProtoLocality();
assertThat(convertedLocality.getRegion()).isEqualTo("test_region");
diff --git a/xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java
new file mode 100644
index 000000000..e845a2ccc
--- /dev/null
+++ b/xds/src/test/java/io/grpc/xds/LrsLoadBalancerTest.java
@@ -0,0 +1,334 @@
+/*
+ * Copyright 2020 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.xds;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
+import io.grpc.Attributes;
+import io.grpc.ClientStreamTracer;
+import io.grpc.ConnectivityState;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer.Helper;
+import io.grpc.LoadBalancer.PickResult;
+import io.grpc.LoadBalancer.PickSubchannelArgs;
+import io.grpc.LoadBalancer.ResolvedAddresses;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.LoadBalancer.SubchannelPicker;
+import io.grpc.LoadBalancerProvider;
+import io.grpc.Status;
+import io.grpc.internal.ServiceConfigUtil.PolicySelection;
+import io.grpc.xds.ClientLoadCounter.LoadRecordingStreamTracerFactory;
+import io.grpc.xds.ClientLoadCounter.LoadRecordingSubchannelPicker;
+import io.grpc.xds.EnvoyProtoData.Locality;
+import io.grpc.xds.LrsLoadBalancerProvider.LrsConfig;
+import java.net.SocketAddress;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+/**
+ * Unit tests for {@link LrsLoadBalancer}.
+ */
+@RunWith(JUnit4.class)
+public class LrsLoadBalancerTest {
+ @Rule
+ public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ private static final String CLUSTER_NAME = "cluster-foo.googleapis.com";
+ private static final String EDS_SERVICE_NAME = "cluster-foo:service-blade";
+ private static final String LRS_SERVER_NAME = "trafficdirector.googleapis.com";
+ private static final Locality TEST_LOCALITY =
+ new Locality("test-region", "test-zone", "test-subzone");
+
+ private final ClientLoadCounter counter = new ClientLoadCounter();
+ private final LoadRecorder loadRecorder = new LoadRecorder();
+ private final Queue<LoadBalancer> childBalancers = new ArrayDeque<>();
+
+ @Mock
+ private Helper helper;
+ private LrsLoadBalancer loadBalancer;
+
+ @Before
+ public void setUp() {
+ loadBalancer = new LrsLoadBalancer(helper);
+ }
+
+ @After
+ public void tearDown() {
+ loadBalancer.shutdown();
+ }
+
+ @Test
+ public void subchannelPickerInterceptedWithLoadRecording() {
+ List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
+ deliverResolvedAddresses(backendAddrs, "round_robin");
+ FakeLoadBalancer childBalancer = (FakeLoadBalancer) childBalancers.poll();
+ NoopSubchannel subchannel = childBalancer.subchannels.values().iterator().next();
+ deliverSubchannelState(subchannel, ConnectivityState.READY);
+ assertThat(loadRecorder.recording).isTrue();
+ ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
+ verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
+ SubchannelPicker picker = pickerCaptor.getValue();
+ assertThat(picker).isInstanceOf(LoadRecordingSubchannelPicker.class);
+ PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
+ ClientStreamTracer.Factory tracerFactory = result.getStreamTracerFactory();
+ assertThat(((LoadRecordingStreamTracerFactory) tracerFactory).getCounter())
+ .isSameInstanceAs(counter);
+ loadBalancer.shutdown();
+ assertThat(childBalancer.shutdown).isTrue();
+ assertThat(loadRecorder.recording).isFalse();
+ }
+
+ @Test
+ public void updateChildPolicy() {
+ List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
+ deliverResolvedAddresses(backendAddrs, "round_robin");
+ FakeLoadBalancer childBalancer = (FakeLoadBalancer) childBalancers.poll();
+ assertThat(childBalancer.name).isEqualTo("round_robin");
+ deliverResolvedAddresses(backendAddrs, "pick_first");
+ assertThat(childBalancer.shutdown).isTrue();
+ childBalancer = (FakeLoadBalancer) childBalancers.poll();
+ assertThat(childBalancer.name).isEqualTo("pick_first");
+ loadBalancer.shutdown();
+ assertThat(childBalancer.shutdown).isTrue();
+ }
+
+ @Test
+ public void errorPropagation() {
+ loadBalancer.handleNameResolutionError(Status.UNKNOWN.withDescription("I failed"));
+ ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
+ verify(helper)
+ .updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
+ Status status =
+ pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)).getStatus();
+ assertThat(status.getDescription()).contains("I failed");
+
+ List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
+ deliverResolvedAddresses(backendAddrs, "round_robin");
+ // Error after child policy is created.
+ loadBalancer.handleNameResolutionError(Status.UNKNOWN.withDescription("I failed"));
+ verify(helper, times(2))
+ .updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
+ status = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)).getStatus();
+ assertThat(status.getDescription()).contains("I failed");
+ assertThat(status.getDescription()).contains("handled by downstream balancer");
+ }
+
+ private void deliverResolvedAddresses(
+ List<EquivalentAddressGroup> addresses, String childPolicy) {
+ PolicySelection childPolicyConfig =
+ new PolicySelection(new FakeLoadBalancerProvider(childPolicy), null, null);
+ LrsConfig config =
+ new LrsConfig(
+ CLUSTER_NAME, EDS_SERVICE_NAME, LRS_SERVER_NAME, TEST_LOCALITY, childPolicyConfig);
+ ResolvedAddresses resolvedAddresses =
+ ResolvedAddresses.newBuilder()
+ .setAddresses(addresses)
+ .setAttributes(
+ Attributes.newBuilder()
+ .set(XdsAttributes.ATTR_CLUSTER_SERVICE_LOAD_STATS_STORE, loadRecorder)
+ .build())
+ .setLoadBalancingPolicyConfig(config)
+ .build();
+ loadBalancer.handleResolvedAddresses(resolvedAddresses);
+ }
+
+ private static List<EquivalentAddressGroup> createResolvedBackendAddresses(int n) {
+ List<EquivalentAddressGroup> list = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
+ list.add(new EquivalentAddressGroup(addr));
+ }
+ return list;
+ }
+
+ private static void deliverSubchannelState(
+ final NoopSubchannel subchannel, ConnectivityState state) {
+ SubchannelPicker picker = new SubchannelPicker() {
+ @Override
+ public PickResult pickSubchannel(PickSubchannelArgs args) {
+ return PickResult.withSubchannel(subchannel);
+ }
+ };
+ subchannel.helper.updateBalancingState(state, picker);
+ }
+
+ private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
+ private final String policyName;
+
+ FakeLoadBalancerProvider(String policyName) {
+ this.policyName = policyName;
+ }
+
+ @Override
+ public LoadBalancer newLoadBalancer(Helper helper) {
+ LoadBalancer balancer = new FakeLoadBalancer(helper, policyName);
+ childBalancers.add(balancer);
+ return balancer;
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ public int getPriority() {
+ return 0; // doesn't matter
+ }
+
+ @Override
+ public String getPolicyName() {
+ return policyName;
+ }
+ }
+
+ private static final class FakeLoadBalancer extends LoadBalancer {
+ private final Helper helper;
+ private final String name;
+ private boolean shutdown;
+ private final Map<EquivalentAddressGroup, NoopSubchannel> subchannels = new HashMap<>();
+
+ FakeLoadBalancer(Helper helper, String name) {
+ this.helper = helper;
+ this.name = name;
+ }
+
+ @Override
+ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
+ List<EquivalentAddressGroup> addresses = resolvedAddresses.getAddresses();
+ for (EquivalentAddressGroup eag : addresses) {
+ subchannels.put(eag, new NoopSubchannel(helper));
+ }
+ }
+
+ @Override
+ public void handleNameResolutionError(final Status error) {
+ SubchannelPicker picker = new SubchannelPicker() {
+ @Override
+ public PickResult pickSubchannel(PickSubchannelArgs args) {
+ return PickResult.withError(error.augmentDescription("handled by downstream balancer"));
+ }
+ };
+ helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, picker);
+ }
+
+ @Override
+ public void shutdown() {
+ shutdown = true;
+ }
+ }
+
+ private static final class NoopSubchannel extends Subchannel {
+ final Helper helper;
+
+ NoopSubchannel(Helper helper) {
+ this.helper = helper;
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+ @Override
+ public void requestConnection() {
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ return Attributes.EMPTY;
+ }
+ }
+
+ private static final class FakeSocketAddress extends SocketAddress {
+ final String name;
+
+ FakeSocketAddress(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return "FakeSocketAddress-" + name;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof FakeSocketAddress) {
+ FakeSocketAddress otherAddr = (FakeSocketAddress) other;
+ return name.equals(otherAddr.name);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+ }
+
+ private final class LoadRecorder implements LoadStatsStore {
+ private boolean recording = false;
+
+ @Override
+ public ClusterStats generateLoadReport() {
+ throw new UnsupportedOperationException("should not be called");
+ }
+
+ @Override
+ public void addLocality(Locality locality) {
+ assertThat(locality).isEqualTo(TEST_LOCALITY);
+ recording = true;
+ }
+
+ @Override
+ public void removeLocality(Locality locality) {
+ assertThat(locality).isEqualTo(TEST_LOCALITY);
+ recording = false;
+ }
+
+ @Override
+ public ClientLoadCounter getLocalityCounter(Locality locality) {
+ assertThat(locality).isEqualTo(TEST_LOCALITY);
+ return counter;
+ }
+
+ @Override
+ public void recordDroppedRequest(String category) {
+ throw new UnsupportedOperationException("should not be called");
+ }
+ }
+}
diff --git a/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java
index 6522c0871..53b17a1eb 100644
--- a/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java
+++ b/xds/src/test/java/io/grpc/xds/XdsClientTestHelper.java
@@ -176,7 +176,7 @@ class XdsClientTestHelper {
}
static io.envoyproxy.envoy.api.v2.endpoint.LocalityLbEndpoints buildLocalityLbEndpoints(
- String region, String zone, String subzone,
+ String region, String zone, String subZone,
List<io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint> lbEndpoints,
int loadBalancingWeight, int priority) {
return
@@ -185,7 +185,7 @@ class XdsClientTestHelper {
io.envoyproxy.envoy.api.v2.core.Locality.newBuilder()
.setRegion(region)
.setZone(zone)
- .setSubZone(subzone))
+ .setSubZone(subZone))
.addAllLbEndpoints(lbEndpoints)
.setLoadBalancingWeight(UInt32Value.newBuilder().setValue(loadBalancingWeight))
.setPriority(priority)