diff options
author | Tony An <tonyjan@google.com> | 2023-08-14 15:23:40 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-14 15:23:40 -0700 |
commit | fba7835de1d57664d9e3434707988cc8ae7aa80a (patch) | |
tree | 5a3a298e4dbf113cb7242ca15a333f8dc619d006 | |
parent | 778c209751bf4d3ca3f5437ba8089d7bddc5b6bc (diff) | |
download | grpc-grpc-java-fba7835de1d57664d9e3434707988cc8ae7aa80a.tar.gz |
new pick first policy, architectural change (#10354)
-rw-r--r-- | core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java | 477 | ||||
-rw-r--r-- | core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java | 2131 |
2 files changed, 2608 insertions, 0 deletions
diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java new file mode 100644 index 000000000..1ea61252e --- /dev/null +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -0,0 +1,477 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.SHUTDOWN; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import com.google.common.collect.Lists; +import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.ExperimentalApi; +import io.grpc.LoadBalancer; +import io.grpc.Status; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; + +/** + * A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link + * io.grpc.NameResolver}. The channel's default behavior is used, which is walking down the address + * list and sticking to the first that works. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10383") +final class PickFirstLeafLoadBalancer extends LoadBalancer { + private final Helper helper; + private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>(); + private Index addressIndex; + private ConnectivityState currentState = IDLE; + + PickFirstLeafLoadBalancer(Helper helper) { + this.helper = checkNotNull(helper, "helper"); + } + + @Override + public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { + List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses(); + if (servers.isEmpty()) { + handleNameResolutionError(Status.UNAVAILABLE.withDescription( + "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses() + + ", attrs=" + resolvedAddresses.getAttributes())); + return false; + } + for (EquivalentAddressGroup eag : servers) { + if (eag == null) { + handleNameResolutionError(Status.UNAVAILABLE.withDescription( + "NameResolver returned address list with null endpoint. addrs=" + + resolvedAddresses.getAddresses() + ", attrs=" + resolvedAddresses.getAttributes())); + return false; + } + } + // We can optionally be configured to shuffle the address list. This can help better distribute + // the load. + if (resolvedAddresses.getLoadBalancingPolicyConfig() + instanceof PickFirstLeafLoadBalancerConfig) { + PickFirstLeafLoadBalancerConfig config + = (PickFirstLeafLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + if (config.shuffleAddressList != null && config.shuffleAddressList) { + servers = new ArrayList<EquivalentAddressGroup>(servers); + Collections.shuffle(servers, + config.randomSeed != null ? new Random(config.randomSeed) : new Random()); + } + } + + final List<EquivalentAddressGroup> newImmutableAddressGroups = + Collections.unmodifiableList(new ArrayList<>(servers)); + + if (addressIndex == null) { + addressIndex = new Index(newImmutableAddressGroups); + } else if (currentState == READY) { + // If a ready subchannel exists in new address list, + // keep this connection and don't create new subchannels + SocketAddress previousAddress = addressIndex.getCurrentAddress(); + addressIndex.updateGroups(newImmutableAddressGroups); + if (addressIndex.seekTo(previousAddress)) { + return true; + } + addressIndex.reset(); + } else { + addressIndex.updateGroups(newImmutableAddressGroups); + } + + // Create subchannels for all new addresses, preserving existing connections + Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet()); + Set<SocketAddress> newAddrs = new HashSet<>(); + for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) { + for (SocketAddress addr : endpoint.getAddresses()) { + newAddrs.add(addr); + if (!subchannels.containsKey(addr)) { + createNewSubchannel(addr); + } + } + } + + // remove old subchannels that were not in new address list + for (SocketAddress oldAddr : oldAddrs) { + if (!newAddrs.contains(oldAddr)) { + subchannels.get(oldAddr).getSubchannel().shutdown(); + subchannels.remove(oldAddr); + } + } + + if (oldAddrs.size() == 0 || currentState == CONNECTING || currentState == READY) { + // start connection attempt at first address + updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); + requestConnection(); + + } else if (currentState == IDLE) { + // start connection attempt at first address when requested + SubchannelPicker picker = new RequestConnectionPicker(this); + updateBalancingState(IDLE, picker); + + } else if (currentState == TRANSIENT_FAILURE) { + // start connection attempt at first address + requestConnection(); + } + + return true; + } + + @Override + public void handleNameResolutionError(Status error) { + for (SubchannelData subchannelData : subchannels.values()) { + subchannelData.getSubchannel().shutdown(); + } + subchannels.clear(); + // NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine + // for time being. + updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); + } + + void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + ConnectivityState newState = stateInfo.getState(); + // Shutdown channels/previously relevant subchannels can still callback with state updates. + // To prevent pickers from returning these obselete subchannels, this logic + // is included to check if the current list of active subchannels includes this subchannel. + if (!subchannels.containsKey(getAddress(subchannel)) + || subchannels.get(getAddress(subchannel)).getSubchannel() != subchannel) { + return; + } + if (newState == SHUTDOWN) { + return; + } + if (newState == IDLE) { + helper.refreshNameResolution(); + } + // If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state + // transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky + // transient failure". Only a subchannel state change to READY will get the LB out of + // TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we + // keep retrying for a connection. + + // With the new pick first implementation, individual subchannels will have their own backoff + // on a per-address basis. Thus, iterative requests for connections will not be requested + // once the first pass through is complete. + // However, every time there is an address update, we will perform a pass through for the new + // addresses in the updated list. + subchannels.get(getAddress(subchannel)).updateState(newState); + if (currentState == TRANSIENT_FAILURE) { + if (newState == CONNECTING) { + // each subchannel is responsible for its own backoff + return; + } else if (newState == IDLE) { + requestConnection(); + return; + } + } + + switch (newState) { + case IDLE: + // Shutdown when ready: connect from beginning when prompted + addressIndex.reset(); + updateBalancingState(IDLE, new RequestConnectionPicker(this));; + break; + case CONNECTING: + // It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave + // the current picker in-place. But ignoring the potential optimization is simpler. + updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); + break; + case READY: + updateBalancingState(READY, new Picker(PickResult.withSubchannel(subchannel))); + shutdownRemaining(subchannel); + addressIndex.seekTo(getAddress(subchannel)); + break; + case TRANSIENT_FAILURE: + // If we are looking at current channel, request a connection if possible + if (addressIndex.isValid() + && subchannels.get(addressIndex.getCurrentAddress()).getSubchannel() == subchannel) { + addressIndex.increment(); + requestConnection(); + + // If no addresses remaining, go into TRANSIENT_FAILURE + if (!addressIndex.isValid()) { + helper.refreshNameResolution(); + updateBalancingState(TRANSIENT_FAILURE, + new Picker(PickResult.withError(stateInfo.getStatus()))); + } + } + break; + default: + throw new IllegalArgumentException("Unsupported state:" + newState); + } + } + + private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) { + if (state != currentState || state == READY || state == TRANSIENT_FAILURE) { + currentState = state; + helper.updateBalancingState(state, picker); + } + } + + @Override + public void shutdown() { + for (SubchannelData subchannelData : subchannels.values()) { + subchannelData.getSubchannel().shutdown(); + } + subchannels.clear(); + } + + /** + * Shuts down remaining subchannels. Called when a subchannel becomes ready, which means + * that all other subchannels must be shutdown. + */ + private void shutdownRemaining(Subchannel activeSubchannel) { + for (SubchannelData subchannelData : subchannels.values()) { + if (!subchannelData.getSubchannel().equals(activeSubchannel)) { + subchannelData.getSubchannel().shutdown(); + } + } + subchannels.clear(); + subchannels.put(getAddress(activeSubchannel), new SubchannelData(activeSubchannel, READY)); + } + + /** + * Requests a connection to the next applicable address' subchannel, creating one if necessary + * If the current channel has already attempted a connection, we attempt a connection + * to the next address/subchannel in our list. + */ + @Override + public void requestConnection() { + if (subchannels.size() == 0) { + return; + } + if (addressIndex.isValid()) { + Subchannel subchannel = subchannels.containsKey(addressIndex.getCurrentAddress()) + ? subchannels.get(addressIndex.getCurrentAddress()).getSubchannel() + : createNewSubchannel(addressIndex.getCurrentAddress()); + + ConnectivityState subchannelState = + subchannels.get(addressIndex.getCurrentAddress()).getState(); + if (subchannelState == IDLE) { + subchannel.requestConnection(); + } else if (subchannelState == CONNECTING || subchannelState == TRANSIENT_FAILURE) { + addressIndex.increment(); + requestConnection(); + } + } + } + + private Subchannel createNewSubchannel(SocketAddress addr) { + final Subchannel subchannel = helper.createSubchannel( + CreateSubchannelArgs.newBuilder() + .setAddresses(Lists.newArrayList( + new EquivalentAddressGroup(addr))) + .build()); + subchannels.put(addr, new SubchannelData(subchannel, IDLE)); + subchannel.start(new SubchannelStateListener() { + @Override + public void onSubchannelState(ConnectivityStateInfo stateInfo) { + processSubchannelState(subchannel, stateInfo); + } + }); + return subchannel; + } + + private SocketAddress getAddress(Subchannel subchannel) { + return subchannel.getAddresses().getAddresses().get(0); + } + + @VisibleForTesting + ConnectivityState getCurrentState() { + return this.currentState; + } + + /** + * No-op picker which doesn't add any custom picking logic. It just passes already known result + * received in constructor. + */ + private static final class Picker extends SubchannelPicker { + private final PickResult result; + + Picker(PickResult result) { + this.result = checkNotNull(result, "result"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return result; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(Picker.class).add("result", result).toString(); + } + } + + /** + * Picker that requests connection during the first pick, and returns noResult. + */ + private final class RequestConnectionPicker extends SubchannelPicker { + private final PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer; + private final AtomicBoolean connectionRequested = new AtomicBoolean(false); + + RequestConnectionPicker(PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer) { + this.pickFirstLeafLoadBalancer = + checkNotNull(pickFirstLeafLoadBalancer, "pickFirstLeafLoadBalancer"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + if (connectionRequested.compareAndSet(false, true)) { + helper.getSynchronizationContext().execute(new Runnable() { + @Override + public void run() { + pickFirstLeafLoadBalancer.requestConnection(); + } + }); + } + return PickResult.withNoResult(); + } + } + + /** + * Index as in 'i', the pointer to an entry. Not a "search index." + */ + @VisibleForTesting + static final class Index { + private List<EquivalentAddressGroup> addressGroups; + private int groupIndex; + private int addressIndex; + + public Index(List<EquivalentAddressGroup> groups) { + this.addressGroups = groups; + } + + public boolean isValid() { + // addressIndex will never be invalid + return groupIndex < addressGroups.size(); + } + + public boolean isAtBeginning() { + return groupIndex == 0 && addressIndex == 0; + } + + public void increment() { + EquivalentAddressGroup group = addressGroups.get(groupIndex); + addressIndex++; + if (addressIndex >= group.getAddresses().size()) { + groupIndex++; + addressIndex = 0; + } + } + + public void reset() { + groupIndex = 0; + addressIndex = 0; + } + + public SocketAddress getCurrentAddress() { + return addressGroups.get(groupIndex).getAddresses().get(addressIndex); + } + + public Attributes getCurrentEagAttributes() { + return addressGroups.get(groupIndex).getAttributes(); + } + + public List<EquivalentAddressGroup> getGroups() { + return addressGroups; + } + + /** + * Update to new groups, resetting the current index. + */ + public void updateGroups(List<EquivalentAddressGroup> newGroups) { + addressGroups = newGroups; + reset(); + } + + /** + * Returns false if the needle was not found and the current index was left unchanged. + */ + public boolean seekTo(SocketAddress needle) { + for (int i = 0; i < addressGroups.size(); i++) { + EquivalentAddressGroup group = addressGroups.get(i); + int j = group.getAddresses().indexOf(needle); + if (j == -1) { + continue; + } + this.groupIndex = i; + this.addressIndex = j; + return true; + } + return false; + } + } + + private static final class SubchannelData { + private final Subchannel subchannel; + private ConnectivityState state; + + public SubchannelData(Subchannel subchannel, ConnectivityState state) { + this.subchannel = subchannel; + this.state = state; + } + + public Subchannel getSubchannel() { + return this.subchannel; + } + + public ConnectivityState getState() { + return this.state; + } + + private void updateState(ConnectivityState newState) { + this.state = newState; + } + } + + public static final class PickFirstLeafLoadBalancerConfig { + + @Nullable + public final Boolean shuffleAddressList; + + // For testing purposes only, not meant to be parsed from a real config. + @Nullable + final Long randomSeed; + + public PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList) { + this(shuffleAddressList, null); + } + + PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList, + @Nullable Long randomSeed) { + this.shuffleAddressList = shuffleAddressList; + this.randomSeed = randomSeed; + } + } +} diff --git a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java new file mode 100644 index 000000000..5437cbc91 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java @@ -0,0 +1,2131 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Lists; +import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer.CreateSubchannelArgs; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancer.SubchannelStateListener; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; +import io.grpc.internal.PickFirstLeafLoadBalancer.PickFirstLeafLoadBalancerConfig; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + + +/** Unit test for {@link PickFirstLeafLoadBalancer}. */ +@RunWith(JUnit4.class) +public class PickFirstLeafLoadBalancerTest { + private PickFirstLeafLoadBalancer loadBalancer; + private final List<EquivalentAddressGroup> servers = Lists.newArrayList(); + private static final Attributes.Key<String> FOO = Attributes.Key.create("foo"); + + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private final Attributes affinity = Attributes.newBuilder().set(FOO, "bar").build(); + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + @Captor + private ArgumentCaptor<SubchannelPicker> pickerCaptor; + @Captor + private ArgumentCaptor<ConnectivityState> connectivityStateCaptor; + @Captor + private ArgumentCaptor<CreateSubchannelArgs> createArgsCaptor; + @Captor + private ArgumentCaptor<SubchannelStateListener> stateListenerCaptor; + @Mock + private Helper mockHelper; + @Mock + private FakeSubchannel mockSubchannel1; + @Mock + private FakeSubchannel mockSubchannel2; + @Mock + private FakeSubchannel mockSubchannel3; + @Mock + private FakeSubchannel mockSubchannel4; + @Mock // This LoadBalancer doesn't use any of the arg fields, as verified in tearDown(). + private PickSubchannelArgs mockArgs; + + @Before + public void setUp() { + for (int i = 1; i < 5; i++) { + SocketAddress addr = new FakeSocketAddress("server" + i); + servers.add(new EquivalentAddressGroup(addr)); + } + mockSubchannel1 = new FakeSubchannel(Lists.newArrayList( + new EquivalentAddressGroup(new FakeSocketAddress("fake"))), null); + mockSubchannel1 = mock(FakeSubchannel.class); + mockSubchannel2 = mock(FakeSubchannel.class); + mockSubchannel3 = mock(FakeSubchannel.class); + mockSubchannel4 = mock(FakeSubchannel.class); + when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))) + .thenReturn(mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4); + + when(mockSubchannel1.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(0))); + when(mockSubchannel2.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(1))); + when(mockSubchannel3.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(2))); + when(mockSubchannel4.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(3))); + + when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); + loadBalancer = new PickFirstLeafLoadBalancer(mockHelper); + } + + @After + public void tearDown() throws Exception { + verifyNoMoreInteractions(mockArgs); + } + + @Test + public void pickAfterResolved() throws Exception { + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + verify(mockHelper, times(4)).createSubchannel(createArgsCaptor.capture()); + List<CreateSubchannelArgs> argsList = createArgsCaptor.getAllValues(); + assertThat(argsList.get(0).getAddresses().get(0)).isEqualTo(servers.get(0)); + assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(1)); + assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); + assertThat(argsList.get(3).getAddresses().get(0)).isEqualTo(servers.get(3)); + assertThat(argsList.get(0).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(3).getAddresses().size()).isEqualTo(1); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockSubchannel1).requestConnection(); + + // Calling pickSubchannel() twice gave the same result + assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), + pickerCaptor.getValue().pickSubchannel(mockArgs)); + + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void pickAfterResolved_shuffle() throws Exception { + servers.remove(3); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity) + .setLoadBalancingPolicyConfig(new PickFirstLeafLoadBalancerConfig(true, 123L)).build()); + + verify(mockHelper, times(3)).createSubchannel(createArgsCaptor.capture()); + List<CreateSubchannelArgs> argsList = createArgsCaptor.getAllValues(); + // We should still see the same set of addresses. + // Because we use a fixed seed, the addresses should always be shuffled in this order. + assertThat(argsList.get(0).getAddresses().get(0)).isEqualTo(servers.get(1)); + assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(0)); + assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); + assertThat(argsList.get(0).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); + + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockSubchannel1).requestConnection(); + + // Calling pickSubchannel() twice gave the same result + assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), + pickerCaptor.getValue().pickSubchannel(mockArgs)); + + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void pickAfterResolved_noShuffle() throws Exception { + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity) + .setLoadBalancingPolicyConfig(new PickFirstLeafLoadBalancerConfig(false)).build()); + + verify(mockHelper, times(4)).createSubchannel(createArgsCaptor.capture()); + List<CreateSubchannelArgs> argsList = createArgsCaptor.getAllValues(); + assertThat(argsList.get(0).getAddresses().get(0)).isEqualTo(servers.get(0)); + assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(1)); + assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); + assertThat(argsList.get(3).getAddresses().get(0)).isEqualTo(servers.get(3)); + assertThat(argsList.get(0).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(3).getAddresses().size()).isEqualTo(1); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockSubchannel1).requestConnection(); + + // Calling pickSubchannel() twice gave the same result + assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), + pickerCaptor.getValue().pickSubchannel(mockArgs)); + + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void requestConnectionPicker() throws Exception { + // Set up + assertEquals(IDLE, loadBalancer.getCurrentState()); + List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(0), servers.get(1), + servers.get(2)); + + // Accepting resolved addresses + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3); + + // We initialize and start all subchannels + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + + // We start connection attempt to the first address in the list + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // If we send the first subchannel into idle ... + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + + SubchannelPicker picker = pickerCaptor.getValue(); + + // Calling pickSubchannel() requests a connection, gives the same result when called twice. + assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); + + // But the picker calls requestConnection() only once for a total of two connection requests. + inOrder.verify(mockSubchannel1).requestConnection(); + verify(mockSubchannel1, times(2)).requestConnection(); + } + + @Test + public void refreshNameResolutionAfterSubchannelConnectionBroken() { + List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(0)); + when(mockSubchannel1.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(0))); + + // accept resolved addresses + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + InOrder inOrder = inOrder(mockHelper, mockSubchannel1); + verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + inOrder.verify(mockSubchannel1).requestConnection(); + + Status error = Status.UNAUTHENTICATED.withDescription("permission denied"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertEquals(mockSubchannel1, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + + // Simulate receiving go-away so the subchannel transit to IDLE. + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + } + + @Test + public void pickAfterResolvedAndUnchanged() throws Exception { + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + verify(mockSubchannel1).requestConnection(); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + verify(mockSubchannel1).requestConnection(); + + verify(mockHelper, times(4)).createSubchannel(createArgsCaptor.capture()); + verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + assertThat(createArgsCaptor.getValue()).isNotNull(); + verify(mockHelper) + .updateBalancingState(isA(ConnectivityState.class), isA(SubchannelPicker.class)); + + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void pickAfterResolvedAndChanged() throws Exception { + SocketAddress socketAddr1 = new FakeSocketAddress("oldserver"); + List<EquivalentAddressGroup> oldServers = + Lists.newArrayList(new EquivalentAddressGroup(socketAddr1)); + + SocketAddress socketAddr2 = new FakeSocketAddress("newserver"); + List<EquivalentAddressGroup> newServers = + Lists.newArrayList(new EquivalentAddressGroup(socketAddr2)); + + InOrder inOrder = inOrder(mockHelper, mockSubchannel1); + + // accept resolved addresses + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + verify(mockSubchannel1).start(any(SubchannelStateListener.class)); + + // start connection attempt to first address + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockSubchannel1).requestConnection(); + + assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + + // updating the subchannel addresses is unnecessary, but doesn't hurt anything + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + verify(mockSubchannel1).shutdown(); + + verifyNoMoreInteractions(mockSubchannel1); + verify(mockSubchannel2).start(any(SubchannelStateListener.class)); + } + + @Test + public void pickAfterStateChangeAfterResolution() throws Exception { + InOrder inOrder = inOrder(mockHelper); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + inOrder.verify(mockHelper, times(4)).createSubchannel(createArgsCaptor.capture()); + List<CreateSubchannelArgs> argsList = createArgsCaptor.getAllValues(); + assertThat(argsList.get(0).getAddresses().get(0)).isEqualTo(servers.get(0)); + assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(1)); + assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); + assertThat(argsList.get(3).getAddresses().get(0)).isEqualTo(servers.get(3)); + assertThat(argsList.get(0).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(3).getAddresses().size()).isEqualTo(1); + verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + verify(mockSubchannel4).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener4 = stateListenerCaptor.getValue(); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + reset(mockHelper); + when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); + + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + + // subchannel reports connecting when pick subchannel is called + assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + Status error = Status.UNAVAILABLE.withDescription("boom!"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener4.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(mockSubchannel1, picker.pickSubchannel(mockArgs).getSubchannel()); + + verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void pickAfterResolutionAfterTransientValue() throws Exception { + InOrder inOrder = inOrder(mockHelper); + List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(0)); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockSubchannel1).requestConnection(); + reset(mockHelper); + when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); + + // An error has happened. + Status error = Status.UNAVAILABLE.withDescription("boom!"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + inOrder.verify(mockHelper).refreshNameResolution(); + assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + + // Transition from TRANSIENT_ERROR to CONNECTING should also be ignored. + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + verifyNoMoreInteractions(mockHelper); + assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + } + + @Test + public void nameResolutionError() throws Exception { + Status error = Status.NOT_FOUND.withDescription("nameResolutionError"); + loadBalancer.handleNameResolutionError(error); + verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertNull(pickResult.getSubchannel()); + assertEquals(error, pickResult.getStatus()); + verify(mockSubchannel1, never()).requestConnection(); + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void nameResolutionError_emptyAddressList() throws Exception { + servers.clear(); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + verify(mockHelper).updateBalancingState(connectivityStateCaptor.capture(), + pickerCaptor.capture()); + PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertThat(pickResult.getSubchannel()).isNull(); + assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(pickResult.getStatus().getDescription()).contains("returned no usable address"); + verify(mockSubchannel1, never()).requestConnection(); + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void nameResolutionSuccessAfterError() throws Exception { + InOrder inOrder = inOrder(mockHelper); + + loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError")); + inOrder.verify(mockHelper) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + verify(mockSubchannel1, never()).requestConnection(); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + inOrder.verify(mockHelper, times(4)).createSubchannel(createArgsCaptor.capture()); + List<CreateSubchannelArgs> argsList = createArgsCaptor.getAllValues(); + assertThat(argsList.get(0).getAddresses().get(0)).isEqualTo(servers.get(0)); + assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(1)); + assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); + assertThat(argsList.get(3).getAddresses().get(0)).isEqualTo(servers.get(3)); + assertThat(argsList.get(0).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(3).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(0).getAttributes()).isEqualTo(Attributes.EMPTY); + assertThat(argsList.get(1).getAttributes()).isEqualTo(Attributes.EMPTY); + assertThat(argsList.get(2).getAttributes()).isEqualTo(Attributes.EMPTY); + assertThat(argsList.get(3).getAttributes()).isEqualTo(Attributes.EMPTY); + + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockSubchannel1).requestConnection(); + + assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs) + .getSubchannel()); + + assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), + pickerCaptor.getValue().pickSubchannel(mockArgs)); + + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void nameResolutionErrorWithStateChanges() throws Exception { + List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(0)); + InOrder inOrder = inOrder(mockHelper); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + verify(mockSubchannel1).start(stateListenerCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState( + eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + Status error = Status.NOT_FOUND.withDescription("nameResolutionError"); + loadBalancer.handleNameResolutionError(error); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertNull(pickResult.getSubchannel()); + assertEquals(error, pickResult.getStatus()); + + Status error2 = Status.NOT_FOUND.withDescription("nameResolutionError2"); + loadBalancer.handleNameResolutionError(error2); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + + pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertNull(pickResult.getSubchannel()); + assertEquals(error2, pickResult.getStatus()); + + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void requestConnection() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + List<EquivalentAddressGroup> oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + inOrder.verify(mockSubchannel1).requestConnection(); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + + // calling requestConnection() starts next subchannel + loadBalancer.requestConnection(); + inOrder.verify(mockSubchannel2).requestConnection(); + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + + // calling requestConnection is now a no-op + loadBalancer.requestConnection(); + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void updateAddresses_emptyEagList_returns_false() { + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + assertFalse(loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(Arrays.<EquivalentAddressGroup>asList()).setAttributes(affinity).build())); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + } + + @Test + public void updateAddresses_eagListWithNull_returns_false() { + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + List<EquivalentAddressGroup> eags = Arrays.asList((EquivalentAddressGroup) null); + assertFalse(loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(eags).setAttributes(affinity).build())); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + } + + @Test + public void updateAddresses_disjoint_idle() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // First connection attempt is successful + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + + // Going into IDLE state + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + assertEquals(IDLE, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + picker = pickerCaptor.getValue(); + + // Creating second set of disjoint endpoints/addresses + List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(2), servers.get(3)); + + // Accept new resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + + // We create new channels, remove old ones, and keep intersecting ones + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + assertEquals(IDLE, loadBalancer.getCurrentState()); + verify(mockSubchannel1).shutdown(); + verify(mockSubchannel2).shutdown(); + assertEquals(IDLE, loadBalancer.getCurrentState()); + + // If obselete subchannel becomes ready, the state should not be affected + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(IDLE, loadBalancer.getCurrentState()); + + // Calling pickSubchannel() twice gave the same result + assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); + + // But the picker calls requestConnection() only once + inOrder.verify(mockSubchannel3).requestConnection(); + + // Ready subchannel 3 + stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + + // Picking a subchannel returns subchannel 3 + picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void updateAddresses_disjoint_connecting() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).requestConnection(); + + // Creating second set of endpoints/addresses + List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(2), servers.get(3)); + + // Accept new resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener4 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Old subchannels should shut down (in no particular order) and request a connection + verify(mockSubchannel1).shutdown(); + verify(mockSubchannel2).shutdown(); + inOrder.verify(mockSubchannel3).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // If old subchannel becomes ready, the state should not be affected + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Fail connection attempt to third address + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Verify starting connection attempt to fourth address + inOrder.verify(mockSubchannel4).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Succeed connection attempt to fourth address + stateListener4.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + } + + @Test + public void updateAddresses_disjoint_ready_twice() { + when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))) + .thenReturn(mockSubchannel1, mockSubchannel2, mockSubchannel3, + mockSubchannel4, mockSubchannel1, mockSubchannel2); + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // First connection attempt is successful + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Successful connection shuts down other subchannel + inOrder.verify(mockSubchannel2).shutdown(); + + // Verify that picker returns correct subchannel + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + + // Creating second set of endpoints/addresses + List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(2), servers.get(3)); + + // Accept new resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener4 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).shutdown(); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + picker = pickerCaptor.getValue(); + + // If obselete subchannel becomes ready, the state should not be affected + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Calling pickSubchannel() twice gave the same result + assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); + + // But the picker calls requestConnection() only once + inOrder.verify(mockSubchannel3).requestConnection(); + picker = pickerCaptor.getValue(); + assertEquals(PickResult.withNoResult(), picker.pickSubchannel(mockArgs)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Ready subchannel 3 + stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + picker = pickerCaptor.getValue(); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Successful connection shuts down other subchannel + inOrder.verify(mockSubchannel4).shutdown(); + + // Verify that pickSubchannel() returns correct subchannel + assertEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + + // Creating third set of endpoints/addresses + List<EquivalentAddressGroup> newestServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Second address update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newestServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel3).shutdown(); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + picker = pickerCaptor.getValue(); + + // Calling pickSubchannel() twice gave the same result + assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); + + // But the picker calls requestConnection() only once + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(PickResult.withNoResult(), pickerCaptor.getValue().pickSubchannel(mockArgs)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // If obselete subchannel becomes ready, the state should not be affected + stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + stateListener4.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Connection attempt to address 1 is unsuccessful + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Starting connection attempt to address 2 + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Connection attempt to address 2 is successful + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Successful connection shuts down other subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + picker = pickerCaptor.getValue(); + inOrder.verify(mockSubchannel1).shutdown(); + + // Verify that picker still returns correct subchannel + assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void updateAddresses_disjoint_transient_failure() { + // Starting first connection attempt + when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))) + .thenReturn(mockSubchannel1, mockSubchannel2, mockSubchannel3, + mockSubchannel4, mockSubchannel1, mockSubchannel2); + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> addrs = Lists.newArrayList(servers.get(0), servers.get(1)); + + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing first connection attempt + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Starting second connection attempt + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing second connection attempt + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); // sticky transient failure + + // Creating second set of endpoints/addresses + List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(2), servers.get(3)); + + // Accept new resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + + // subchannel 3 still attempts a connection even though we stay in transient failure + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener4 = stateListenerCaptor.getValue(); + verify(mockSubchannel1).shutdown(); + verify(mockSubchannel2).shutdown(); + inOrder.verify(mockSubchannel3).requestConnection(); + + // Obselete subchannels should not affect us + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Third subchannel connection attempt is unsuccessful + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + inOrder.verify(mockSubchannel4).requestConnection(); + + // Fourth subchannel connection attempt is successful + stateListener4.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Picking a subchannel returns subchannel 3 + SubchannelPicker picker = pickerCaptor.getValue(); + inOrder.verify(mockSubchannel3).shutdown(); + assertEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + } + + @Test + public void updateAddresses_intersecting_idle() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = + Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + inOrder.verify(mockSubchannel1).requestConnection(); + + // First connection attempt is successful + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Successful connection attempt shuts down other subchannels + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + inOrder.verify(mockSubchannel2).shutdown(); + + // Verify that picker returns correct subchannel + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + + // Going into IDLE state, nothing should happen unless requested + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + assertEquals(IDLE, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + picker = pickerCaptor.getValue(); + verifyNoMoreInteractions(mockHelper); + + // Creating second set of intersecting endpoints/addresses + List<EquivalentAddressGroup> newServers = + Lists.newArrayList(servers.get(0), servers.get(1), servers.get(3)); + + // Accept new resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + assertEquals(IDLE, loadBalancer.getCurrentState()); + + // We create new channels and remove old ones, keeping intersecting ones + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + + // If obselete subchannel becomes ready, the state should not be affected + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(IDLE, loadBalancer.getCurrentState()); + + // Calling pickSubchannel() twice gave the same result + assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); + + // But the picker calls requestConnection() only once + inOrder.verify(mockSubchannel1).requestConnection(); + + // internal subchannel calls back and reports connecting + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(PickResult.withNoResult(), pickerCaptor.getValue().pickSubchannel(mockArgs)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Ready subchannel 1 + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + + // Picking a subchannel returns subchannel 1 + picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void updateAddresses_intersecting_connecting() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = + Lists.newArrayList(servers.get(0), servers.get(1), servers.get(2)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + inOrder.verify(mockSubchannel1).requestConnection(); + + // callback from internal subchannel + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + + // Creating second set of endpoints/addresses + List<EquivalentAddressGroup> newServers = + Lists.newArrayList(servers.get(0), servers.get(1), servers.get(3)); + + // Accept new resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Don't unnecessarily create new subchannels and keep intersecting ones + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + verifyNoMoreInteractions(mockHelper); + + // If obselete subchannel becomes ready, the state should not be affected + stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // First connection attempt is successful + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void updateAddresses_intersecting_ready() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = + Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // First connection attempt is successful + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + inOrder.verify(mockSubchannel2).shutdown(); + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + + // Creating second set of endpoints/addresses + List<EquivalentAddressGroup> newServers = + Lists.newArrayList(servers.get(0), servers.get(1), servers.get(2)); + // Accept new resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + + // The state is still READY after update since we had an intersecting subchannel that was READY. + assertEquals(READY, loadBalancer.getCurrentState()); + + // Verify that picker still returns correct subchannel + picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void updateAddresses_intersecting_transient_failure() { + // Starting first connection attempt + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); // captor: captures + + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = + Lists.newArrayList(servers.get(0), servers.get(1)); + + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing first connection attempt + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Starting second connection attempt + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing second connection attempt + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); // sticky transient failure + + // Creating second set of endpoints/addresses + List<EquivalentAddressGroup> newServers = + Lists.newArrayList(servers.get(0), servers.get(1), servers.get(2)); + + // Accept new resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + + // subchannel 3 still attempts a connection even though we stay in transient failure + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + inOrder.verify(mockSubchannel3).requestConnection(); + + // no other connections should be requested by LB, we should come out of backoff to request + verifyNoMoreInteractions(mockSubchannel3); + + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Second subchannel connection attempt is now successful + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + + // Picking a subchannel returns subchannel 3 + SubchannelPicker picker = pickerCaptor.getValue(); + inOrder.verify(mockSubchannel3).shutdown(); + assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void updateAddresses_intersecting_enter_transient_failure() { + // after an address update occurs, verify that the client properly tries all + // addresses and only then enters transient failure. + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).requestConnection(); + + // callback from internal subchannel + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + + // Creating second set of endpoints/addresses + List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(0), servers.get(2)); + + // Accept new resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // We create new channels and remove old ones, keeping intersecting ones + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + inOrder.verify(mockSubchannel2).shutdown(); + + // If obselete subchannel becomes ready, the state should not be affected + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // First connection attempt is unsuccessful + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Subchannel 3 attempt starts but fails + inOrder.verify(mockSubchannel3).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + } + + @Test + public void updateAddresses_identical_idle() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // First connection attempt is successful + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + + // Going into IDLE state + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + assertEquals(IDLE, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + + // Accept same resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + + // Verify that no new subchannels were created or started + verify(mockHelper, times(3)).createSubchannel(createArgsCaptor.capture()); + verify(mockSubchannel1, times(1)).start(stateListenerCaptor.capture()); + verify(mockSubchannel2, times(1)).start(stateListenerCaptor.capture()); + + // First connection attempt is successful + assertEquals(IDLE, loadBalancer.getCurrentState()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void updateAddresses_identical_connecting() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Accept same resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + + // Verify that no new subchannels were created or started + verify(mockHelper, times(2)).createSubchannel(createArgsCaptor.capture()); + verify(mockSubchannel1, times(1)).start(stateListenerCaptor.capture()); + verify(mockSubchannel2, times(1)).start(stateListenerCaptor.capture()); + + // First connection attempt is successful + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void updateAddresses_identical_ready() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // First connection attempt is successful + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + inOrder.verify(mockSubchannel2).shutdown(); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + + // Accept same resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + + // Verify that no new subchannels were created or started + verify(mockHelper, times(2)).createSubchannel(createArgsCaptor.capture()); + verify(mockSubchannel1, times(1)).start(stateListenerCaptor.capture()); + verify(mockSubchannel2, times(1)).start(stateListenerCaptor.capture()); + assertEquals(READY, loadBalancer.getCurrentState()); + + // verify that picker hasn't changed via checking mock helper's interactions + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void updateAddresses_identical_transient_failure() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // First connection attempt is unsuccessful + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Second connection attempt is unsuccessful + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Accept same resolved addresses to update + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + + // Verify that no new subchannels were created or started + verify(mockHelper, times(2)).createSubchannel(createArgsCaptor.capture()); + verify(mockSubchannel1, times(1)).start(stateListenerCaptor.capture()); + verify(mockSubchannel2, times(1)).start(stateListenerCaptor.capture()); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // No new connections are requested, subchannels responsible for completing their own backoffs + verifyNoMoreInteractions(mockHelper); + + // First connection attempt is successful + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + inOrder.verify(mockSubchannel2).shutdown(); + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void twoAddressesSeriallyConnect() { + // Starting first connection attempt + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3); + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing first connection attempt + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Starting second connection attempt + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Second connection attempt is successful + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void multiple_backoffs() { + // This test case mimics a backoff without implementing one + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); // captor: captures + + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Starting first connection attempt + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing first connection attempt + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Starting second connection attempt + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing second connection attempt + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Mimic backoff for first address + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Mimic backoff for second address + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Failing first connection attempt + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Failing second connection attempt + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Mimic backoff for first address + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Mimic backoff for second address + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Connection attempt to second address is now successful + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + when(mockSubchannel2.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(0))); + inOrder.verify(mockSubchannel1).shutdown(); + assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + + // If first subchannel is ready before it completes shutdown, we still choose subchannel 2 + // This can be verified by checking the mock helper. + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void backoff_faster_than_serial_connection() { + // this tests the case where a subchannel completes its backoff and readies a connection + // before the other addresses have a chance to complete their connection attempts + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); // captor: captures + + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> addrs = Lists.newArrayList(servers.get(0), + servers.get(1)); + + // Accepting resolved addresses starts all subchannels + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing first connection attempt + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Starting second connection attempt + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Mimic backoff for first address + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Connection attempt to first address is now successful + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void success_from_transient_failure() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); // captor: captures + + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> addrs = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accepting resolved addresses starts all subchannels + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing first connection attempt + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Starting second connection attempt + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Mimic backoff for first address + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing second connection attempt + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); // sticky transient failure + + // Failing connection attempt to first address + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Mimic backoff for second address + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Connection attempt to second address is now successful + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + + // If first address is successful, nothing happens. Verify by checking mock helper + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + verifyNoMoreInteractions(mockHelper); + } + + @Test + public void lastAddressFailingNotTransientFailure() { + // This tests the case where after an address update, the last address escapes a backoff + // and reports transient failure before all addresses have failed connection + // attempts, in which case we should not report TRANSIENT_FAILURE. + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + + // Accept Addresses and verify proper connection flow + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // First connection attempt is unsuccessful + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Second connection attempt is connecting + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Accept same resolved addresses to update + List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(2), servers.get(1)); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + + // Verify that no new subchannels were created or started + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + inOrder.verify(mockSubchannel1).shutdown(); + inOrder.verify(mockSubchannel3).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Second address connection attempt is unsuccessful, but should not go into transient failure + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Third address connection attempt is unsuccessful, now we enter transient failure + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Obselete subchannels have no impact + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getCurrentState()); + + // Second subchannel is successful + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + inOrder.verify(mockSubchannel3).shutdown(); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void recreate_shutdown_subchannel() { + // Take the case where the latter subchannel is readied. If we then go to an IDLE state and + // re-request a connection, we should start and create a new subchannel for the first + // address in our list. + + // Starting first connection attempt + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); // captor: captures + + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> addrs = + Lists.newArrayList(servers.get(0), servers.get(1)); + + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing first connection attempt + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Starting second connection attempt + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Successful second connection attempt + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockSubchannel1).shutdown(); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Go to IDLE + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + assertEquals(IDLE, loadBalancer.getCurrentState()); + + SubchannelPicker picker = pickerCaptor.getValue(); + + // Calling pickSubchannel() requests a connection, gives the same result when called twice. + assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + inOrder.verify(mockSubchannel3).requestConnection(); + when(mockSubchannel3.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(0))); + stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // first subchannel connection attempt fails + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // second subchannel connection attempt + inOrder.verify(mockSubchannel2).requestConnection(); + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + inOrder.verify(mockSubchannel3).shutdown(); + picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void ready_then_transient_failure_again() { + // Starting first connection attempt + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, + mockSubchannel3, mockSubchannel4); // captor: captures + + // Creating first set of endpoints/addresses + List<EquivalentAddressGroup> addrs = + Lists.newArrayList(servers.get(0), servers.get(1)); + + assertEquals(IDLE, loadBalancer.getCurrentState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel1).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Failing first connection attempt + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Starting second connection attempt + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // Successful second connection attempt + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockSubchannel1).shutdown(); + assertEquals(READY, loadBalancer.getCurrentState()); + + // Go to IDLE + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + assertEquals(IDLE, loadBalancer.getCurrentState()); + + SubchannelPicker picker = pickerCaptor.getValue(); + + // Calling pickSubchannel() requests a connection, gives the same result when called twice. + assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + inOrder.verify(mockSubchannel3).requestConnection(); + when(mockSubchannel3.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(0))); + stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // first subchannel connection attempt fails + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getCurrentState()); + + // second subchannel connection attempt + inOrder.verify(mockSubchannel2).requestConnection(); + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getCurrentState()); + + // verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + inOrder.verify(mockSubchannel3).shutdown(); + picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); + assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + + @Test + public void index_looping() { + Attributes.Key<String> key = Attributes.Key.create("some-key"); + Attributes attr1 = Attributes.newBuilder().set(key, "1").build(); + Attributes attr2 = Attributes.newBuilder().set(key, "2").build(); + Attributes attr3 = Attributes.newBuilder().set(key, "3").build(); + SocketAddress addr1 = new FakeSocketAddress("addr1"); + SocketAddress addr2 = new FakeSocketAddress("addr2"); + SocketAddress addr3 = new FakeSocketAddress("addr3"); + SocketAddress addr4 = new FakeSocketAddress("addr4"); + SocketAddress addr5 = new FakeSocketAddress("addr5"); + PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1, addr2), attr1), + new EquivalentAddressGroup(Arrays.asList(addr3), attr2), + new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3))); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1); + assertThat(index.isAtBeginning()).isTrue(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr2); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr3); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr5); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr3); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isFalse(); + + index.reset(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1); + assertThat(index.isAtBeginning()).isTrue(); + assertThat(index.isValid()).isTrue(); + + // We want to make sure both groupIndex and addressIndex are reset + index.increment(); + index.increment(); + index.increment(); + index.increment(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr5); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr3); + index.reset(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); + assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1); + } + + @Test + public void index_updateGroups_resets() { + SocketAddress addr1 = new FakeSocketAddress("addr1"); + SocketAddress addr2 = new FakeSocketAddress("addr2"); + SocketAddress addr3 = new FakeSocketAddress("addr3"); + PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1)), + new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); + index.increment(); + index.increment(); + // We want to make sure both groupIndex and addressIndex are reset + index.updateGroups(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1)), + new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); + } + + @Test + public void index_seekTo() { + SocketAddress addr1 = new FakeSocketAddress("addr1"); + SocketAddress addr2 = new FakeSocketAddress("addr2"); + SocketAddress addr3 = new FakeSocketAddress("addr3"); + PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1, addr2)), + new EquivalentAddressGroup(Arrays.asList(addr3)))); + assertThat(index.seekTo(addr3)).isTrue(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3); + assertThat(index.seekTo(addr1)).isTrue(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); + assertThat(index.seekTo(addr2)).isTrue(); + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2); + index.seekTo(new FakeSocketAddress("addr4")); + // Failed seekTo doesn't change the index + assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2); + } + + private static class FakeSocketAddress extends SocketAddress { + final String name; + + FakeSocketAddress(String name) { + this.name = name; + } + + @Override + public String toString() { + return "FakeSocketAddress-" + name; + } + + } + + private static class FakeSubchannel extends Subchannel { + private final Attributes attributes; + private List<EquivalentAddressGroup> eags; + private SubchannelStateListener listener; + + public FakeSubchannel(List<EquivalentAddressGroup> eags, Attributes attributes) { + this.eags = Collections.unmodifiableList(eags); + this.attributes = attributes; + } + + @Override + public List<EquivalentAddressGroup> getAllAddresses() { + return eags; + } + + @Override + public Attributes getAttributes() { + return attributes; + } + + @Override + public void start(SubchannelStateListener listener) { + this.listener = checkNotNull(listener, "listener"); + } + + @Override + public void updateAddresses(List<EquivalentAddressGroup> addrs) { + this.eags = Collections.unmodifiableList(addrs); + } + + @Override + public void shutdown() { + } + + @Override + public void requestConnection() { + listener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + } + } +}
\ No newline at end of file |