diff options
author | Nick Hill <nickhill@us.ibm.com> | 2018-09-12 23:06:46 +0100 |
---|---|---|
committer | Carl Mastrangelo <notcarl@google.com> | 2018-09-12 15:06:46 -0700 |
commit | ed709ff9ff37e0b7b30be05efb63c1fd3cbb8741 (patch) | |
tree | 4617895a95ebf2da6a816e214a02ef13aacdd59a /core | |
parent | 7270938ebf8a4cfb30f70b298335d0708caef366 (diff) | |
download | grpc-grpc-java-ed709ff9ff37e0b7b30be05efb63c1fd3cbb8741.tar.gz |
core: remove redundant SubchannelPicker refreshes in RoundRobinLoadBalancer
* Remove redundant SubchannelPicker refreshes in RoundRobinLoadBalancer
- Ensure active subchannel list and round-robin index is only
regenerated/refreshed when it changes
- Make it so that Subchannels exist in subchannels map iff their state
!= SHUTDOWN
- Add EmptyPicker class since logic for this case is disjoint from the
non-empty case
* remove explicit initialization of boolean ready field
per @carl-mastrangelo's review comment
* minor restructuring to make logic clearer; more explanatory comments
* move some checks inside updateBalancingState method for clarity
* store current state and picker in RRLB, only update when new one is diff
* some more simplification/refactoring; improve test coverage
- remove now redundant check in handleSubchannelState
- collapse getAggregatedState() and getAggregatedError() into
handleBalancingState()
- have both pickers extend new RoundRobinPicker, move
areEquivalentPickers() logic into RoundRobinPicker.isEquivalentTo()
- extend unit tests to cover some additional cases
* Address latest review comments from @zhangkun83
- Use explicit check for non-empty list instead of assert
- Change EmptyPicker.status to be non-nullable
- Further test coverage improvement including explicit picker comparison
tests
* use EMPTY_OK instead of Status.OK for initial empty picker
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java | 213 | ||||
-rw-r--r-- | core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java | 153 |
2 files changed, 233 insertions, 133 deletions
diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java index cfb989e5b..2e6e5e053 100644 --- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java @@ -24,6 +24,9 @@ import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + import io.grpc.Attributes; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; @@ -42,12 +45,10 @@ import io.grpc.internal.GrpcAttributes; import io.grpc.internal.ServiceConfigUtil; import java.util.ArrayList; import java.util.Collection; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Queue; import java.util.Random; import java.util.Set; @@ -113,6 +114,9 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory { new HashMap<EquivalentAddressGroup, Subchannel>(); private final Random random; + private ConnectivityState currentState; + private RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK); + @Nullable private StickinessState stickinessState; @@ -176,50 +180,92 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory { // Shutdown subchannels for removed addresses. for (EquivalentAddressGroup addressGroup : removedAddrs) { Subchannel subchannel = subchannels.remove(addressGroup); - subchannel.shutdown(); + shutdownSubchannel(subchannel); } - updateBalancingState(getAggregatedState(), getAggregatedError()); + updateBalancingState(); } @Override public void handleNameResolutionError(Status error) { - updateBalancingState(TRANSIENT_FAILURE, error); + // ready pickers aren't affected by status changes + updateBalancingState(TRANSIENT_FAILURE, + currentPicker instanceof ReadyPicker ? currentPicker : new EmptyPicker(error)); } @Override public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { - if (stateInfo.getState() == SHUTDOWN && stickinessState != null) { - stickinessState.remove(subchannel); - } if (subchannels.get(subchannel.getAddresses()) != subchannel) { return; } + if (stateInfo.getState() == SHUTDOWN && stickinessState != null) { + stickinessState.remove(subchannel); + } if (stateInfo.getState() == IDLE) { subchannel.requestConnection(); } getSubchannelStateInfoRef(subchannel).value = stateInfo; - updateBalancingState(getAggregatedState(), getAggregatedError()); + updateBalancingState(); + } + + private void shutdownSubchannel(Subchannel subchannel) { + subchannel.shutdown(); + getSubchannelStateInfoRef(subchannel).value = + ConnectivityStateInfo.forNonError(SHUTDOWN); + if (stickinessState != null) { + stickinessState.remove(subchannel); + } } @Override public void shutdown() { for (Subchannel subchannel : getSubchannels()) { - subchannel.shutdown(); + shutdownSubchannel(subchannel); } } + private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready"); + /** * Updates picker with the list of active subchannels (state == READY). */ - private void updateBalancingState(ConnectivityState state, Status error) { + @SuppressWarnings("ReferenceEquality") + private void updateBalancingState() { List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels()); - // initialize the Picker to a random start index to ensure that a high frequency of Picker - // churn does not skew subchannel selection. - int startIndex = activeList.isEmpty() ? 0 : random.nextInt(activeList.size()); - helper.updateBalancingState( - state, - new Picker(activeList, error, startIndex, stickinessState)); + if (activeList.isEmpty()) { + // No READY subchannels, determine aggregate state and error status + boolean isConnecting = false; + Status aggStatus = EMPTY_OK; + for (Subchannel subchannel : getSubchannels()) { + ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value; + // This subchannel IDLE is not because of channel IDLE_TIMEOUT, + // in which case LB is already shutdown. + // RRLB will request connection immediately on subchannel IDLE. + if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) { + isConnecting = true; + } + if (aggStatus == EMPTY_OK || !aggStatus.isOk()) { + aggStatus = stateInfo.getStatus(); + } + } + updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE, + // If all subchannels are TRANSIENT_FAILURE, return the Status associated with + // an arbitrary subchannel, otherwise return OK. + new EmptyPicker(aggStatus)); + } else { + // initialize the Picker to a random start index to ensure that a high frequency of Picker + // churn does not skew subchannel selection. + int startIndex = random.nextInt(activeList.size()); + updateBalancingState(READY, new ReadyPicker(activeList, startIndex, stickinessState)); + } + } + + private void updateBalancingState(ConnectivityState state, RoundRobinPicker picker) { + if (state != currentState || !picker.isEquivalentTo(currentPicker)) { + helper.updateBalancingState(state, picker); + currentState = state; + currentPicker = picker; + } } /** @@ -229,7 +275,7 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory { Collection<Subchannel> subchannels) { List<Subchannel> readySubchannels = new ArrayList<Subchannel>(subchannels.size()); for (Subchannel subchannel : subchannels) { - if (getSubchannelStateInfoRef(subchannel).value.getState() == READY) { + if (isReady(subchannel)) { readySubchannels.add(subchannel); } } @@ -248,43 +294,6 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory { return addrs; } - /** - * If all subchannels are TRANSIENT_FAILURE, return the Status associated with an arbitrary - * subchannel otherwise, return null. - */ - @Nullable - private Status getAggregatedError() { - Status status = null; - for (Subchannel subchannel : getSubchannels()) { - ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value; - if (stateInfo.getState() != TRANSIENT_FAILURE) { - return null; - } - status = stateInfo.getStatus(); - } - return status; - } - - private ConnectivityState getAggregatedState() { - Set<ConnectivityState> states = EnumSet.noneOf(ConnectivityState.class); - for (Subchannel subchannel : getSubchannels()) { - states.add(getSubchannelStateInfoRef(subchannel).value.getState()); - } - if (states.contains(READY)) { - return READY; - } - if (states.contains(CONNECTING)) { - return CONNECTING; - } - if (states.contains(IDLE)) { - // This subchannel IDLE is not because of channel IDLE_TIMEOUT, in which case LB is already - // shutdown. - // RRLB will request connection immediately on subchannel IDLE. - return CONNECTING; - } - return TRANSIENT_FAILURE; - } - @VisibleForTesting Collection<Subchannel> getSubchannels() { return subchannels.values(); @@ -294,6 +303,11 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory { Subchannel subchannel) { return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO"); } + + // package-private to avoid synthetic access + static boolean isReady(Subchannel subchannel) { + return getSubchannelStateInfoRef(subchannel).value.getState() == READY; + } private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) { Set<T> aCopy = new HashSet<T>(a); @@ -312,7 +326,8 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory { * Holds stickiness related states: The stickiness key, a registry mapping stickiness values to * the associated Subchannel Ref, and a map from Subchannel to Subchannel Ref. */ - private static final class StickinessState { + @VisibleForTesting + static final class StickinessState { static final int MAX_ENTRIES = 1000; final Key<String> key; @@ -332,7 +347,7 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory { */ @Nonnull Subchannel maybeRegister( - String stickinessValue, @Nonnull Subchannel subchannel, List<Subchannel> rrList) { + String stickinessValue, @Nonnull Subchannel subchannel) { final Ref<Subchannel> newSubchannelRef = subchannel.getAttributes().get(STICKY_REF); while (true) { Ref<Subchannel> existingSubchannelRef = @@ -344,7 +359,7 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory { } else { // existing entry Subchannel existingSubchannel = existingSubchannelRef.value; - if (existingSubchannel != null && rrList.contains(existingSubchannel)) { + if (existingSubchannel != null && isReady(existingSubchannel)) { return existingSubchannel; } } @@ -384,59 +399,49 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory { } } } + + // Only subclasses are ReadyPicker or EmptyPicker + private abstract static class RoundRobinPicker extends SubchannelPicker { + abstract boolean isEquivalentTo(RoundRobinPicker picker); + } @VisibleForTesting - static final class Picker extends SubchannelPicker { - private static final AtomicIntegerFieldUpdater<Picker> indexUpdater = - AtomicIntegerFieldUpdater.newUpdater(Picker.class, "index"); + static final class ReadyPicker extends RoundRobinPicker { + private static final AtomicIntegerFieldUpdater<ReadyPicker> indexUpdater = + AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index"); - @Nullable - private final Status status; - private final List<Subchannel> list; + private final List<Subchannel> list; // non-empty @Nullable private final RoundRobinLoadBalancer.StickinessState stickinessState; @SuppressWarnings("unused") private volatile int index; - Picker( - List<Subchannel> list, @Nullable Status status, int startIndex, + ReadyPicker(List<Subchannel> list, int startIndex, @Nullable RoundRobinLoadBalancer.StickinessState stickinessState) { + Preconditions.checkArgument(!list.isEmpty(), "empty list"); this.list = list; - this.status = status; this.stickinessState = stickinessState; this.index = startIndex - 1; } @Override public PickResult pickSubchannel(PickSubchannelArgs args) { - if (list.size() > 0) { - Subchannel subchannel = null; - if (stickinessState != null) { - String stickinessValue = args.getHeaders().get(stickinessState.key); - if (stickinessValue != null) { - subchannel = stickinessState.getSubchannel(stickinessValue); - if (subchannel == null || !list.contains(subchannel)) { - subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel(), list); - } + Subchannel subchannel = null; + if (stickinessState != null) { + String stickinessValue = args.getHeaders().get(stickinessState.key); + if (stickinessValue != null) { + subchannel = stickinessState.getSubchannel(stickinessValue); + if (subchannel == null || !RoundRobinLoadBalancer.isReady(subchannel)) { + subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel()); } } - - return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel()); - } - - if (status != null) { - return PickResult.withError(status); } - return PickResult.withNoResult(); + return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel()); } private Subchannel nextSubchannel() { int size = list.size(); - if (size == 0) { - throw new NoSuchElementException(); - } - int i = indexUpdater.incrementAndGet(this); if (i >= size) { int oldi = i; @@ -451,9 +456,37 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory { return list; } - @VisibleForTesting - Status getStatus() { - return status; + @Override + boolean isEquivalentTo(RoundRobinPicker picker) { + if (!(picker instanceof ReadyPicker)) { + return false; + } + ReadyPicker other = (ReadyPicker) picker; + // the lists cannot contain duplicate subchannels + return other == this || (stickinessState == other.stickinessState + && list.size() == other.list.size() + && new HashSet<Subchannel>(list).containsAll(other.list)); + } + } + + @VisibleForTesting + static final class EmptyPicker extends RoundRobinPicker { + + private final Status status; + + EmptyPicker(@Nonnull Status status) { + this.status = Preconditions.checkNotNull(status, "status"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return status.isOk() ? PickResult.withNoResult() : PickResult.withError(status); + } + + @Override + boolean isEquivalentTo(RoundRobinPicker picker) { + return picker instanceof EmptyPicker && (Objects.equal(status, ((EmptyPicker) picker).status) + || (status.isOk() && ((EmptyPicker) picker).status.isOk())); } } } diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 567a70f3c..aea42c98f 100644 --- a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -20,13 +20,16 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer.STATE_INFO; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; @@ -49,16 +52,22 @@ import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.Metadata; import io.grpc.Metadata.Key; import io.grpc.Status; import io.grpc.internal.GrpcAttributes; -import io.grpc.util.RoundRobinLoadBalancerFactory.Picker; +import io.grpc.util.RoundRobinLoadBalancerFactory.EmptyPicker; +import io.grpc.util.RoundRobinLoadBalancerFactory.ReadyPicker; import io.grpc.util.RoundRobinLoadBalancerFactory.Ref; import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer; +import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer.StickinessState; import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -87,7 +96,7 @@ public class RoundRobinLoadBalancerTest { private Attributes affinity = Attributes.newBuilder().set(MAJOR_KEY, "I got the keys").build(); @Captor - private ArgumentCaptor<Picker> pickerCaptor; + private ArgumentCaptor<SubchannelPicker> pickerCaptor; @Captor private ArgumentCaptor<ConnectivityState> stateCaptor; @Captor @@ -151,7 +160,7 @@ public class RoundRobinLoadBalancerTest { assertEquals(CONNECTING, stateCaptor.getAllValues().get(0)); assertEquals(READY, stateCaptor.getAllValues().get(1)); - assertThat(pickerCaptor.getValue().getList()).containsExactly(readySubchannel); + assertThat(getList(pickerCaptor.getValue())).containsExactly(readySubchannel); verifyNoMoreInteractions(mockHelper); } @@ -195,9 +204,8 @@ public class RoundRobinLoadBalancerTest { InOrder inOrder = inOrder(mockHelper); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); - assertNull(picker.getStatus()); - assertThat(picker.getList()).containsExactly(removedSubchannel, oldSubchannel); + SubchannelPicker picker = pickerCaptor.getValue(); + assertThat(getList(picker)).containsExactly(removedSubchannel, oldSubchannel); verify(removedSubchannel, times(1)).requestConnection(); verify(oldSubchannel, times(1)).requestConnection(); @@ -218,6 +226,9 @@ public class RoundRobinLoadBalancerTest { verify(newSubchannel, times(1)).requestConnection(); verify(removedSubchannel, times(1)).shutdown(); + + loadBalancer.handleSubchannelState(removedSubchannel, + ConnectivityStateInfo.forNonError(SHUTDOWN)); assertThat(loadBalancer.getSubchannels()).containsExactly(oldSubchannel, newSubchannel); @@ -227,8 +238,14 @@ public class RoundRobinLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); picker = pickerCaptor.getValue(); - assertNull(picker.getStatus()); - assertThat(picker.getList()).containsExactly(oldSubchannel, newSubchannel); + assertThat(getList(picker)).containsExactly(oldSubchannel, newSubchannel); + + // test going from non-empty to empty + loadBalancer.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), + affinity); + + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + assertEquals(PickResult.withNoResult(), pickerCaptor.getValue().pickSubchannel(mockArgs)); verifyNoMoreInteractions(mockHelper); } @@ -241,13 +258,13 @@ public class RoundRobinLoadBalancerTest { Ref<ConnectivityStateInfo> subchannelStateInfo = subchannel.getAttributes().get( STATE_INFO); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(Picker.class)); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); assertThat(subchannelStateInfo.value).isEqualTo(ConnectivityStateInfo.forNonError(IDLE)); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); - assertNull(pickerCaptor.getValue().getStatus()); + assertThat(pickerCaptor.getValue()).isInstanceOf(ReadyPicker.class); assertThat(subchannelStateInfo.value).isEqualTo( ConnectivityStateInfo.forNonError(READY)); @@ -257,12 +274,10 @@ public class RoundRobinLoadBalancerTest { assertThat(subchannelStateInfo.value).isEqualTo( ConnectivityStateInfo.forTransientFailure(error)); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - assertNull(pickerCaptor.getValue().getStatus()); + assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - assertNull(pickerCaptor.getValue().getStatus()); assertThat(subchannelStateInfo.value).isEqualTo( ConnectivityStateInfo.forNonError(IDLE)); @@ -282,9 +297,9 @@ public class RoundRobinLoadBalancerTest { Subchannel subchannel1 = mock(Subchannel.class); Subchannel subchannel2 = mock(Subchannel.class); - Picker picker = new Picker(Collections.unmodifiableList(Lists.newArrayList( - subchannel, subchannel1, subchannel2)), null /* status */, 0 /* startIndex */, - null /* stickinessState */); + ReadyPicker picker = new ReadyPicker(Collections.unmodifiableList( + Lists.<Subchannel>newArrayList(subchannel, subchannel1, subchannel2)), + 0 /* startIndex */, null /* stickinessState */); assertThat(picker.getList()).containsExactly(subchannel, subchannel1, subchannel2); @@ -296,8 +311,7 @@ public class RoundRobinLoadBalancerTest { @Test public void pickerEmptyList() throws Exception { - Picker picker = - new Picker(Lists.<Subchannel>newArrayList(), Status.UNKNOWN, 0, null /* stickinessState */); + SubchannelPicker picker = new EmptyPicker(Status.UNKNOWN); assertEquals(null, picker.pickSubchannel(mockArgs).getSubchannel()); assertEquals(Status.UNKNOWN, @@ -363,23 +377,23 @@ public class RoundRobinLoadBalancerTest { verify(mockHelper, times(6)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); Iterator<ConnectivityState> stateIterator = stateCaptor.getAllValues().iterator(); - Iterator<Picker> pickers = pickerCaptor.getAllValues().iterator(); + Iterator<SubchannelPicker> pickers = pickerCaptor.getAllValues().iterator(); // The picker is incrementally updated as subchannels become READY assertEquals(CONNECTING, stateIterator.next()); - assertThat(pickers.next().getList()).isEmpty(); + assertThat(pickers.next()).isInstanceOf(EmptyPicker.class); assertEquals(READY, stateIterator.next()); - assertThat(pickers.next().getList()).containsExactly(sc1); + assertThat(getList(pickers.next())).containsExactly(sc1); assertEquals(READY, stateIterator.next()); - assertThat(pickers.next().getList()).containsExactly(sc1, sc2); + assertThat(getList(pickers.next())).containsExactly(sc1, sc2); assertEquals(READY, stateIterator.next()); - assertThat(pickers.next().getList()).containsExactly(sc1, sc2, sc3); + assertThat(getList(pickers.next())).containsExactly(sc1, sc2, sc3); // The IDLE subchannel is dropped from the picker, but a reconnection is requested assertEquals(READY, stateIterator.next()); - assertThat(pickers.next().getList()).containsExactly(sc1, sc3); + assertThat(getList(pickers.next())).containsExactly(sc1, sc3); verify(sc2, times(2)).requestConnection(); // The failing subchannel is dropped from the picker, with no requested reconnect assertEquals(READY, stateIterator.next()); - assertThat(pickers.next().getList()).containsExactly(sc1); + assertThat(getList(pickers.next())).containsExactly(sc1); verify(sc3, times(1)).requestConnection(); assertThat(stateIterator.hasNext()).isFalse(); assertThat(pickers.hasNext()).isFalse(); @@ -393,14 +407,14 @@ public class RoundRobinLoadBalancerTest { } verify(mockHelper, times(4)) .updateBalancingState(any(ConnectivityState.class), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue = new Metadata(); headerWithStickinessValue.put(stickinessKey, "my-sticky-value"); doReturn(headerWithStickinessValue).when(mockArgs).getHeaders(); - List<Subchannel> allSubchannels = picker.getList(); + List<Subchannel> allSubchannels = getList(picker); Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel(); Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel(); Subchannel sc3 = picker.pickSubchannel(mockArgs).getSubchannel(); @@ -426,11 +440,11 @@ public class RoundRobinLoadBalancerTest { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); doReturn(new Metadata()).when(mockArgs).getHeaders(); - List<Subchannel> allSubchannels = picker.getList(); + List<Subchannel> allSubchannels = getList(picker); Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel(); Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel(); @@ -458,7 +472,7 @@ public class RoundRobinLoadBalancerTest { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue = new Metadata(); @@ -488,7 +502,7 @@ public class RoundRobinLoadBalancerTest { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue1 = new Metadata(); @@ -497,7 +511,7 @@ public class RoundRobinLoadBalancerTest { Metadata headerWithStickinessValue2 = new Metadata(); headerWithStickinessValue2.put(stickinessKey, "my-sticky-value2"); - List<Subchannel> allSubchannels = picker.getList(); + List<Subchannel> allSubchannels = getList(picker); doReturn(headerWithStickinessValue1).when(mockArgs).getHeaders(); Subchannel sc1a = picker.pickSubchannel(mockArgs).getSubchannel(); @@ -533,16 +547,13 @@ public class RoundRobinLoadBalancerTest { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue = new Metadata(); headerWithStickinessValue.put(stickinessKey, "my-sticky-value"); doReturn(headerWithStickinessValue).when(mockArgs).getHeaders(); - @SuppressWarnings("unused") - List<Subchannel> allSubchannels = picker.getList(); - // first pick Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel(); @@ -584,16 +595,13 @@ public class RoundRobinLoadBalancerTest { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue1 = new Metadata(); headerWithStickinessValue1.put(stickinessKey, "my-sticky-value"); doReturn(headerWithStickinessValue1).when(mockArgs).getHeaders(); - @SuppressWarnings("unused") - List<Subchannel> allSubchannels = picker.getList(); - // first pick Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel(); @@ -609,7 +617,6 @@ public class RoundRobinLoadBalancerTest { picker = pickerCaptor.getValue(); // second pick with a different stickiness value - @SuppressWarnings("unused") Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel(); // go back to ready @@ -641,17 +648,18 @@ public class RoundRobinLoadBalancerTest { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue = new Metadata(); headerWithStickinessValue.put(stickinessKey, "my-sticky-value"); doReturn(headerWithStickinessValue).when(mockArgs).getHeaders(); - List<Subchannel> allSubchannels = Lists.newArrayList(picker.getList()); + List<Subchannel> allSubchannels = Lists.newArrayList(getList(picker)); Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel(); + // shutdown channel directly loadBalancer .handleSubchannelState(sc1, ConnectivityStateInfo.forNonError(ConnectivityState.SHUTDOWN)); @@ -661,6 +669,27 @@ public class RoundRobinLoadBalancerTest { picker.pickSubchannel(mockArgs).getSubchannel()); assertThat(loadBalancer.getStickinessMapForTest()).hasSize(1); verify(mockArgs, atLeast(2)).getHeaders(); + + Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel(); + + assertEquals(sc2, loadBalancer.getStickinessMapForTest().get("my-sticky-value").value); + + // shutdown channel via name resolver change + List<EquivalentAddressGroup> newServers = new ArrayList<EquivalentAddressGroup>(servers); + newServers.remove(sc2.getAddresses()); + + loadBalancer.handleResolvedAddressGroups(newServers, attributes); + + verify(sc2, times(1)).shutdown(); + + loadBalancer.handleSubchannelState(sc2, ConnectivityStateInfo.forNonError(SHUTDOWN)); + + assertNull(loadBalancer.getStickinessMapForTest().get("my-sticky-value").value); + + assertEquals(nextSubchannel(sc2, allSubchannels), + picker.pickSubchannel(mockArgs).getSubchannel()); + assertThat(loadBalancer.getStickinessMapForTest()).hasSize(1); + verify(mockArgs, atLeast(2)).getHeaders(); } @Test @@ -696,6 +725,44 @@ public class RoundRobinLoadBalancerTest { assertSame(stickinessMap1, stickinessMap2); } + + @Test(expected = IllegalArgumentException.class) + public void readyPicker_emptyList() { + // ready picker list must be non-empty + new ReadyPicker(Collections.<Subchannel>emptyList(), 0, null); + } + + @Test + public void internalPickerComparisons() { + EmptyPicker emptyOk1 = new EmptyPicker(Status.OK); + EmptyPicker emptyOk2 = new EmptyPicker(Status.OK.withDescription("different OK")); + EmptyPicker emptyErr = new EmptyPicker(Status.UNKNOWN.withDescription("¯\\_(ツ)_//¯")); + + Iterator<Subchannel> subchannelIterator = subchannels.values().iterator(); + Subchannel sc1 = subchannelIterator.next(); + Subchannel sc2 = subchannelIterator.next(); + StickinessState stickinessState = new StickinessState("stick-key"); + ReadyPicker ready1 = new ReadyPicker(Arrays.asList(sc1, sc2), 0, null); + ReadyPicker ready2 = new ReadyPicker(Arrays.asList(sc1), 0, null); + ReadyPicker ready3 = new ReadyPicker(Arrays.asList(sc2, sc1), 1, null); + ReadyPicker ready4 = new ReadyPicker(Arrays.asList(sc1, sc2), 1, stickinessState); + ReadyPicker ready5 = new ReadyPicker(Arrays.asList(sc2, sc1), 0, stickinessState); + + assertTrue(emptyOk1.isEquivalentTo(emptyOk2)); + assertFalse(emptyOk1.isEquivalentTo(emptyErr)); + assertFalse(ready1.isEquivalentTo(ready2)); + assertTrue(ready1.isEquivalentTo(ready3)); + assertFalse(ready3.isEquivalentTo(ready4)); + assertTrue(ready4.isEquivalentTo(ready5)); + assertFalse(emptyOk1.isEquivalentTo(ready1)); + assertFalse(ready1.isEquivalentTo(emptyOk1)); + } + + + private static List<Subchannel> getList(SubchannelPicker picker) { + return picker instanceof ReadyPicker ? ((ReadyPicker) picker).getList() : + Collections.<Subchannel>emptyList(); + } private static class FakeSocketAddress extends SocketAddress { final String name; |