aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorNick Hill <nickhill@us.ibm.com>2018-09-12 23:06:46 +0100
committerCarl Mastrangelo <notcarl@google.com>2018-09-12 15:06:46 -0700
commited709ff9ff37e0b7b30be05efb63c1fd3cbb8741 (patch)
tree4617895a95ebf2da6a816e214a02ef13aacdd59a /core
parent7270938ebf8a4cfb30f70b298335d0708caef366 (diff)
downloadgrpc-grpc-java-ed709ff9ff37e0b7b30be05efb63c1fd3cbb8741.tar.gz
core: remove redundant SubchannelPicker refreshes in RoundRobinLoadBalancer
* Remove redundant SubchannelPicker refreshes in RoundRobinLoadBalancer - Ensure active subchannel list and round-robin index is only regenerated/refreshed when it changes - Make it so that Subchannels exist in subchannels map iff their state != SHUTDOWN - Add EmptyPicker class since logic for this case is disjoint from the non-empty case * remove explicit initialization of boolean ready field per @carl-mastrangelo's review comment * minor restructuring to make logic clearer; more explanatory comments * move some checks inside updateBalancingState method for clarity * store current state and picker in RRLB, only update when new one is diff * some more simplification/refactoring; improve test coverage - remove now redundant check in handleSubchannelState - collapse getAggregatedState() and getAggregatedError() into handleBalancingState() - have both pickers extend new RoundRobinPicker, move areEquivalentPickers() logic into RoundRobinPicker.isEquivalentTo() - extend unit tests to cover some additional cases * Address latest review comments from @zhangkun83 - Use explicit check for non-empty list instead of assert - Change EmptyPicker.status to be non-nullable - Further test coverage improvement including explicit picker comparison tests * use EMPTY_OK instead of Status.OK for initial empty picker
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java213
-rw-r--r--core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java153
2 files changed, 233 insertions, 133 deletions
diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java
index cfb989e5b..2e6e5e053 100644
--- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java
+++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java
@@ -24,6 +24,9 @@ import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
@@ -42,12 +45,10 @@ import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ServiceConfigUtil;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
@@ -113,6 +114,9 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
new HashMap<EquivalentAddressGroup, Subchannel>();
private final Random random;
+ private ConnectivityState currentState;
+ private RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK);
+
@Nullable
private StickinessState stickinessState;
@@ -176,50 +180,92 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
// Shutdown subchannels for removed addresses.
for (EquivalentAddressGroup addressGroup : removedAddrs) {
Subchannel subchannel = subchannels.remove(addressGroup);
- subchannel.shutdown();
+ shutdownSubchannel(subchannel);
}
- updateBalancingState(getAggregatedState(), getAggregatedError());
+ updateBalancingState();
}
@Override
public void handleNameResolutionError(Status error) {
- updateBalancingState(TRANSIENT_FAILURE, error);
+ // ready pickers aren't affected by status changes
+ updateBalancingState(TRANSIENT_FAILURE,
+ currentPicker instanceof ReadyPicker ? currentPicker : new EmptyPicker(error));
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
- if (stateInfo.getState() == SHUTDOWN && stickinessState != null) {
- stickinessState.remove(subchannel);
- }
if (subchannels.get(subchannel.getAddresses()) != subchannel) {
return;
}
+ if (stateInfo.getState() == SHUTDOWN && stickinessState != null) {
+ stickinessState.remove(subchannel);
+ }
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection();
}
getSubchannelStateInfoRef(subchannel).value = stateInfo;
- updateBalancingState(getAggregatedState(), getAggregatedError());
+ updateBalancingState();
+ }
+
+ private void shutdownSubchannel(Subchannel subchannel) {
+ subchannel.shutdown();
+ getSubchannelStateInfoRef(subchannel).value =
+ ConnectivityStateInfo.forNonError(SHUTDOWN);
+ if (stickinessState != null) {
+ stickinessState.remove(subchannel);
+ }
}
@Override
public void shutdown() {
for (Subchannel subchannel : getSubchannels()) {
- subchannel.shutdown();
+ shutdownSubchannel(subchannel);
}
}
+ private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready");
+
/**
* Updates picker with the list of active subchannels (state == READY).
*/
- private void updateBalancingState(ConnectivityState state, Status error) {
+ @SuppressWarnings("ReferenceEquality")
+ private void updateBalancingState() {
List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels());
- // initialize the Picker to a random start index to ensure that a high frequency of Picker
- // churn does not skew subchannel selection.
- int startIndex = activeList.isEmpty() ? 0 : random.nextInt(activeList.size());
- helper.updateBalancingState(
- state,
- new Picker(activeList, error, startIndex, stickinessState));
+ if (activeList.isEmpty()) {
+ // No READY subchannels, determine aggregate state and error status
+ boolean isConnecting = false;
+ Status aggStatus = EMPTY_OK;
+ for (Subchannel subchannel : getSubchannels()) {
+ ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value;
+ // This subchannel IDLE is not because of channel IDLE_TIMEOUT,
+ // in which case LB is already shutdown.
+ // RRLB will request connection immediately on subchannel IDLE.
+ if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
+ isConnecting = true;
+ }
+ if (aggStatus == EMPTY_OK || !aggStatus.isOk()) {
+ aggStatus = stateInfo.getStatus();
+ }
+ }
+ updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE,
+ // If all subchannels are TRANSIENT_FAILURE, return the Status associated with
+ // an arbitrary subchannel, otherwise return OK.
+ new EmptyPicker(aggStatus));
+ } else {
+ // initialize the Picker to a random start index to ensure that a high frequency of Picker
+ // churn does not skew subchannel selection.
+ int startIndex = random.nextInt(activeList.size());
+ updateBalancingState(READY, new ReadyPicker(activeList, startIndex, stickinessState));
+ }
+ }
+
+ private void updateBalancingState(ConnectivityState state, RoundRobinPicker picker) {
+ if (state != currentState || !picker.isEquivalentTo(currentPicker)) {
+ helper.updateBalancingState(state, picker);
+ currentState = state;
+ currentPicker = picker;
+ }
}
/**
@@ -229,7 +275,7 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
Collection<Subchannel> subchannels) {
List<Subchannel> readySubchannels = new ArrayList<Subchannel>(subchannels.size());
for (Subchannel subchannel : subchannels) {
- if (getSubchannelStateInfoRef(subchannel).value.getState() == READY) {
+ if (isReady(subchannel)) {
readySubchannels.add(subchannel);
}
}
@@ -248,43 +294,6 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
return addrs;
}
- /**
- * If all subchannels are TRANSIENT_FAILURE, return the Status associated with an arbitrary
- * subchannel otherwise, return null.
- */
- @Nullable
- private Status getAggregatedError() {
- Status status = null;
- for (Subchannel subchannel : getSubchannels()) {
- ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value;
- if (stateInfo.getState() != TRANSIENT_FAILURE) {
- return null;
- }
- status = stateInfo.getStatus();
- }
- return status;
- }
-
- private ConnectivityState getAggregatedState() {
- Set<ConnectivityState> states = EnumSet.noneOf(ConnectivityState.class);
- for (Subchannel subchannel : getSubchannels()) {
- states.add(getSubchannelStateInfoRef(subchannel).value.getState());
- }
- if (states.contains(READY)) {
- return READY;
- }
- if (states.contains(CONNECTING)) {
- return CONNECTING;
- }
- if (states.contains(IDLE)) {
- // This subchannel IDLE is not because of channel IDLE_TIMEOUT, in which case LB is already
- // shutdown.
- // RRLB will request connection immediately on subchannel IDLE.
- return CONNECTING;
- }
- return TRANSIENT_FAILURE;
- }
-
@VisibleForTesting
Collection<Subchannel> getSubchannels() {
return subchannels.values();
@@ -294,6 +303,11 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
Subchannel subchannel) {
return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO");
}
+
+ // package-private to avoid synthetic access
+ static boolean isReady(Subchannel subchannel) {
+ return getSubchannelStateInfoRef(subchannel).value.getState() == READY;
+ }
private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) {
Set<T> aCopy = new HashSet<T>(a);
@@ -312,7 +326,8 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
* Holds stickiness related states: The stickiness key, a registry mapping stickiness values to
* the associated Subchannel Ref, and a map from Subchannel to Subchannel Ref.
*/
- private static final class StickinessState {
+ @VisibleForTesting
+ static final class StickinessState {
static final int MAX_ENTRIES = 1000;
final Key<String> key;
@@ -332,7 +347,7 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
*/
@Nonnull
Subchannel maybeRegister(
- String stickinessValue, @Nonnull Subchannel subchannel, List<Subchannel> rrList) {
+ String stickinessValue, @Nonnull Subchannel subchannel) {
final Ref<Subchannel> newSubchannelRef = subchannel.getAttributes().get(STICKY_REF);
while (true) {
Ref<Subchannel> existingSubchannelRef =
@@ -344,7 +359,7 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
} else {
// existing entry
Subchannel existingSubchannel = existingSubchannelRef.value;
- if (existingSubchannel != null && rrList.contains(existingSubchannel)) {
+ if (existingSubchannel != null && isReady(existingSubchannel)) {
return existingSubchannel;
}
}
@@ -384,59 +399,49 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
}
}
}
+
+ // Only subclasses are ReadyPicker or EmptyPicker
+ private abstract static class RoundRobinPicker extends SubchannelPicker {
+ abstract boolean isEquivalentTo(RoundRobinPicker picker);
+ }
@VisibleForTesting
- static final class Picker extends SubchannelPicker {
- private static final AtomicIntegerFieldUpdater<Picker> indexUpdater =
- AtomicIntegerFieldUpdater.newUpdater(Picker.class, "index");
+ static final class ReadyPicker extends RoundRobinPicker {
+ private static final AtomicIntegerFieldUpdater<ReadyPicker> indexUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index");
- @Nullable
- private final Status status;
- private final List<Subchannel> list;
+ private final List<Subchannel> list; // non-empty
@Nullable
private final RoundRobinLoadBalancer.StickinessState stickinessState;
@SuppressWarnings("unused")
private volatile int index;
- Picker(
- List<Subchannel> list, @Nullable Status status, int startIndex,
+ ReadyPicker(List<Subchannel> list, int startIndex,
@Nullable RoundRobinLoadBalancer.StickinessState stickinessState) {
+ Preconditions.checkArgument(!list.isEmpty(), "empty list");
this.list = list;
- this.status = status;
this.stickinessState = stickinessState;
this.index = startIndex - 1;
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
- if (list.size() > 0) {
- Subchannel subchannel = null;
- if (stickinessState != null) {
- String stickinessValue = args.getHeaders().get(stickinessState.key);
- if (stickinessValue != null) {
- subchannel = stickinessState.getSubchannel(stickinessValue);
- if (subchannel == null || !list.contains(subchannel)) {
- subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel(), list);
- }
+ Subchannel subchannel = null;
+ if (stickinessState != null) {
+ String stickinessValue = args.getHeaders().get(stickinessState.key);
+ if (stickinessValue != null) {
+ subchannel = stickinessState.getSubchannel(stickinessValue);
+ if (subchannel == null || !RoundRobinLoadBalancer.isReady(subchannel)) {
+ subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel());
}
}
-
- return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
- }
-
- if (status != null) {
- return PickResult.withError(status);
}
- return PickResult.withNoResult();
+ return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
}
private Subchannel nextSubchannel() {
int size = list.size();
- if (size == 0) {
- throw new NoSuchElementException();
- }
-
int i = indexUpdater.incrementAndGet(this);
if (i >= size) {
int oldi = i;
@@ -451,9 +456,37 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
return list;
}
- @VisibleForTesting
- Status getStatus() {
- return status;
+ @Override
+ boolean isEquivalentTo(RoundRobinPicker picker) {
+ if (!(picker instanceof ReadyPicker)) {
+ return false;
+ }
+ ReadyPicker other = (ReadyPicker) picker;
+ // the lists cannot contain duplicate subchannels
+ return other == this || (stickinessState == other.stickinessState
+ && list.size() == other.list.size()
+ && new HashSet<Subchannel>(list).containsAll(other.list));
+ }
+ }
+
+ @VisibleForTesting
+ static final class EmptyPicker extends RoundRobinPicker {
+
+ private final Status status;
+
+ EmptyPicker(@Nonnull Status status) {
+ this.status = Preconditions.checkNotNull(status, "status");
+ }
+
+ @Override
+ public PickResult pickSubchannel(PickSubchannelArgs args) {
+ return status.isOk() ? PickResult.withNoResult() : PickResult.withError(status);
+ }
+
+ @Override
+ boolean isEquivalentTo(RoundRobinPicker picker) {
+ return picker instanceof EmptyPicker && (Objects.equal(status, ((EmptyPicker) picker).status)
+ || (status.isOk() && ((EmptyPicker) picker).status.isOk()));
}
}
}
diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java
index 567a70f3c..aea42c98f 100644
--- a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java
+++ b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java
@@ -20,13 +20,16 @@ import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
+import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer.STATE_INFO;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
@@ -49,16 +52,22 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
+import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.Status;
import io.grpc.internal.GrpcAttributes;
-import io.grpc.util.RoundRobinLoadBalancerFactory.Picker;
+import io.grpc.util.RoundRobinLoadBalancerFactory.EmptyPicker;
+import io.grpc.util.RoundRobinLoadBalancerFactory.ReadyPicker;
import io.grpc.util.RoundRobinLoadBalancerFactory.Ref;
import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer;
+import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer.StickinessState;
import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -87,7 +96,7 @@ public class RoundRobinLoadBalancerTest {
private Attributes affinity = Attributes.newBuilder().set(MAJOR_KEY, "I got the keys").build();
@Captor
- private ArgumentCaptor<Picker> pickerCaptor;
+ private ArgumentCaptor<SubchannelPicker> pickerCaptor;
@Captor
private ArgumentCaptor<ConnectivityState> stateCaptor;
@Captor
@@ -151,7 +160,7 @@ public class RoundRobinLoadBalancerTest {
assertEquals(CONNECTING, stateCaptor.getAllValues().get(0));
assertEquals(READY, stateCaptor.getAllValues().get(1));
- assertThat(pickerCaptor.getValue().getList()).containsExactly(readySubchannel);
+ assertThat(getList(pickerCaptor.getValue())).containsExactly(readySubchannel);
verifyNoMoreInteractions(mockHelper);
}
@@ -195,9 +204,8 @@ public class RoundRobinLoadBalancerTest {
InOrder inOrder = inOrder(mockHelper);
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
- Picker picker = pickerCaptor.getValue();
- assertNull(picker.getStatus());
- assertThat(picker.getList()).containsExactly(removedSubchannel, oldSubchannel);
+ SubchannelPicker picker = pickerCaptor.getValue();
+ assertThat(getList(picker)).containsExactly(removedSubchannel, oldSubchannel);
verify(removedSubchannel, times(1)).requestConnection();
verify(oldSubchannel, times(1)).requestConnection();
@@ -218,6 +226,9 @@ public class RoundRobinLoadBalancerTest {
verify(newSubchannel, times(1)).requestConnection();
verify(removedSubchannel, times(1)).shutdown();
+
+ loadBalancer.handleSubchannelState(removedSubchannel,
+ ConnectivityStateInfo.forNonError(SHUTDOWN));
assertThat(loadBalancer.getSubchannels()).containsExactly(oldSubchannel,
newSubchannel);
@@ -227,8 +238,14 @@ public class RoundRobinLoadBalancerTest {
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
picker = pickerCaptor.getValue();
- assertNull(picker.getStatus());
- assertThat(picker.getList()).containsExactly(oldSubchannel, newSubchannel);
+ assertThat(getList(picker)).containsExactly(oldSubchannel, newSubchannel);
+
+ // test going from non-empty to empty
+ loadBalancer.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(),
+ affinity);
+
+ inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
+ assertEquals(PickResult.withNoResult(), pickerCaptor.getValue().pickSubchannel(mockArgs));
verifyNoMoreInteractions(mockHelper);
}
@@ -241,13 +258,13 @@ public class RoundRobinLoadBalancerTest {
Ref<ConnectivityStateInfo> subchannelStateInfo = subchannel.getAttributes().get(
STATE_INFO);
- inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(Picker.class));
+ inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class));
assertThat(subchannelStateInfo.value).isEqualTo(ConnectivityStateInfo.forNonError(IDLE));
loadBalancer.handleSubchannelState(subchannel,
ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
- assertNull(pickerCaptor.getValue().getStatus());
+ assertThat(pickerCaptor.getValue()).isInstanceOf(ReadyPicker.class);
assertThat(subchannelStateInfo.value).isEqualTo(
ConnectivityStateInfo.forNonError(READY));
@@ -257,12 +274,10 @@ public class RoundRobinLoadBalancerTest {
assertThat(subchannelStateInfo.value).isEqualTo(
ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
- assertNull(pickerCaptor.getValue().getStatus());
+ assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class);
loadBalancer.handleSubchannelState(subchannel,
ConnectivityStateInfo.forNonError(IDLE));
- inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
- assertNull(pickerCaptor.getValue().getStatus());
assertThat(subchannelStateInfo.value).isEqualTo(
ConnectivityStateInfo.forNonError(IDLE));
@@ -282,9 +297,9 @@ public class RoundRobinLoadBalancerTest {
Subchannel subchannel1 = mock(Subchannel.class);
Subchannel subchannel2 = mock(Subchannel.class);
- Picker picker = new Picker(Collections.unmodifiableList(Lists.newArrayList(
- subchannel, subchannel1, subchannel2)), null /* status */, 0 /* startIndex */,
- null /* stickinessState */);
+ ReadyPicker picker = new ReadyPicker(Collections.unmodifiableList(
+ Lists.<Subchannel>newArrayList(subchannel, subchannel1, subchannel2)),
+ 0 /* startIndex */, null /* stickinessState */);
assertThat(picker.getList()).containsExactly(subchannel, subchannel1, subchannel2);
@@ -296,8 +311,7 @@ public class RoundRobinLoadBalancerTest {
@Test
public void pickerEmptyList() throws Exception {
- Picker picker =
- new Picker(Lists.<Subchannel>newArrayList(), Status.UNKNOWN, 0, null /* stickinessState */);
+ SubchannelPicker picker = new EmptyPicker(Status.UNKNOWN);
assertEquals(null, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(Status.UNKNOWN,
@@ -363,23 +377,23 @@ public class RoundRobinLoadBalancerTest {
verify(mockHelper, times(6))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Iterator<ConnectivityState> stateIterator = stateCaptor.getAllValues().iterator();
- Iterator<Picker> pickers = pickerCaptor.getAllValues().iterator();
+ Iterator<SubchannelPicker> pickers = pickerCaptor.getAllValues().iterator();
// The picker is incrementally updated as subchannels become READY
assertEquals(CONNECTING, stateIterator.next());
- assertThat(pickers.next().getList()).isEmpty();
+ assertThat(pickers.next()).isInstanceOf(EmptyPicker.class);
assertEquals(READY, stateIterator.next());
- assertThat(pickers.next().getList()).containsExactly(sc1);
+ assertThat(getList(pickers.next())).containsExactly(sc1);
assertEquals(READY, stateIterator.next());
- assertThat(pickers.next().getList()).containsExactly(sc1, sc2);
+ assertThat(getList(pickers.next())).containsExactly(sc1, sc2);
assertEquals(READY, stateIterator.next());
- assertThat(pickers.next().getList()).containsExactly(sc1, sc2, sc3);
+ assertThat(getList(pickers.next())).containsExactly(sc1, sc2, sc3);
// The IDLE subchannel is dropped from the picker, but a reconnection is requested
assertEquals(READY, stateIterator.next());
- assertThat(pickers.next().getList()).containsExactly(sc1, sc3);
+ assertThat(getList(pickers.next())).containsExactly(sc1, sc3);
verify(sc2, times(2)).requestConnection();
// The failing subchannel is dropped from the picker, with no requested reconnect
assertEquals(READY, stateIterator.next());
- assertThat(pickers.next().getList()).containsExactly(sc1);
+ assertThat(getList(pickers.next())).containsExactly(sc1);
verify(sc3, times(1)).requestConnection();
assertThat(stateIterator.hasNext()).isFalse();
assertThat(pickers.hasNext()).isFalse();
@@ -393,14 +407,14 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(any(ConnectivityState.class), pickerCaptor.capture());
- Picker picker = pickerCaptor.getValue();
+ SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
headerWithStickinessValue.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue).when(mockArgs).getHeaders();
- List<Subchannel> allSubchannels = picker.getList();
+ List<Subchannel> allSubchannels = getList(picker);
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc3 = picker.pickSubchannel(mockArgs).getSubchannel();
@@ -426,11 +440,11 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
- Picker picker = pickerCaptor.getValue();
+ SubchannelPicker picker = pickerCaptor.getValue();
doReturn(new Metadata()).when(mockArgs).getHeaders();
- List<Subchannel> allSubchannels = picker.getList();
+ List<Subchannel> allSubchannels = getList(picker);
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel();
@@ -458,7 +472,7 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
- Picker picker = pickerCaptor.getValue();
+ SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
@@ -488,7 +502,7 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
- Picker picker = pickerCaptor.getValue();
+ SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue1 = new Metadata();
@@ -497,7 +511,7 @@ public class RoundRobinLoadBalancerTest {
Metadata headerWithStickinessValue2 = new Metadata();
headerWithStickinessValue2.put(stickinessKey, "my-sticky-value2");
- List<Subchannel> allSubchannels = picker.getList();
+ List<Subchannel> allSubchannels = getList(picker);
doReturn(headerWithStickinessValue1).when(mockArgs).getHeaders();
Subchannel sc1a = picker.pickSubchannel(mockArgs).getSubchannel();
@@ -533,16 +547,13 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
- Picker picker = pickerCaptor.getValue();
+ SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
headerWithStickinessValue.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue).when(mockArgs).getHeaders();
- @SuppressWarnings("unused")
- List<Subchannel> allSubchannels = picker.getList();
-
// first pick
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
@@ -584,16 +595,13 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
- Picker picker = pickerCaptor.getValue();
+ SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue1 = new Metadata();
headerWithStickinessValue1.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue1).when(mockArgs).getHeaders();
- @SuppressWarnings("unused")
- List<Subchannel> allSubchannels = picker.getList();
-
// first pick
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
@@ -609,7 +617,6 @@ public class RoundRobinLoadBalancerTest {
picker = pickerCaptor.getValue();
// second pick with a different stickiness value
- @SuppressWarnings("unused")
Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel();
// go back to ready
@@ -641,17 +648,18 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
- Picker picker = pickerCaptor.getValue();
+ SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
headerWithStickinessValue.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue).when(mockArgs).getHeaders();
- List<Subchannel> allSubchannels = Lists.newArrayList(picker.getList());
+ List<Subchannel> allSubchannels = Lists.newArrayList(getList(picker));
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
+ // shutdown channel directly
loadBalancer
.handleSubchannelState(sc1, ConnectivityStateInfo.forNonError(ConnectivityState.SHUTDOWN));
@@ -661,6 +669,27 @@ public class RoundRobinLoadBalancerTest {
picker.pickSubchannel(mockArgs).getSubchannel());
assertThat(loadBalancer.getStickinessMapForTest()).hasSize(1);
verify(mockArgs, atLeast(2)).getHeaders();
+
+ Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel();
+
+ assertEquals(sc2, loadBalancer.getStickinessMapForTest().get("my-sticky-value").value);
+
+ // shutdown channel via name resolver change
+ List<EquivalentAddressGroup> newServers = new ArrayList<EquivalentAddressGroup>(servers);
+ newServers.remove(sc2.getAddresses());
+
+ loadBalancer.handleResolvedAddressGroups(newServers, attributes);
+
+ verify(sc2, times(1)).shutdown();
+
+ loadBalancer.handleSubchannelState(sc2, ConnectivityStateInfo.forNonError(SHUTDOWN));
+
+ assertNull(loadBalancer.getStickinessMapForTest().get("my-sticky-value").value);
+
+ assertEquals(nextSubchannel(sc2, allSubchannels),
+ picker.pickSubchannel(mockArgs).getSubchannel());
+ assertThat(loadBalancer.getStickinessMapForTest()).hasSize(1);
+ verify(mockArgs, atLeast(2)).getHeaders();
}
@Test
@@ -696,6 +725,44 @@ public class RoundRobinLoadBalancerTest {
assertSame(stickinessMap1, stickinessMap2);
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void readyPicker_emptyList() {
+ // ready picker list must be non-empty
+ new ReadyPicker(Collections.<Subchannel>emptyList(), 0, null);
+ }
+
+ @Test
+ public void internalPickerComparisons() {
+ EmptyPicker emptyOk1 = new EmptyPicker(Status.OK);
+ EmptyPicker emptyOk2 = new EmptyPicker(Status.OK.withDescription("different OK"));
+ EmptyPicker emptyErr = new EmptyPicker(Status.UNKNOWN.withDescription("¯\\_(ツ)_//¯"));
+
+ Iterator<Subchannel> subchannelIterator = subchannels.values().iterator();
+ Subchannel sc1 = subchannelIterator.next();
+ Subchannel sc2 = subchannelIterator.next();
+ StickinessState stickinessState = new StickinessState("stick-key");
+ ReadyPicker ready1 = new ReadyPicker(Arrays.asList(sc1, sc2), 0, null);
+ ReadyPicker ready2 = new ReadyPicker(Arrays.asList(sc1), 0, null);
+ ReadyPicker ready3 = new ReadyPicker(Arrays.asList(sc2, sc1), 1, null);
+ ReadyPicker ready4 = new ReadyPicker(Arrays.asList(sc1, sc2), 1, stickinessState);
+ ReadyPicker ready5 = new ReadyPicker(Arrays.asList(sc2, sc1), 0, stickinessState);
+
+ assertTrue(emptyOk1.isEquivalentTo(emptyOk2));
+ assertFalse(emptyOk1.isEquivalentTo(emptyErr));
+ assertFalse(ready1.isEquivalentTo(ready2));
+ assertTrue(ready1.isEquivalentTo(ready3));
+ assertFalse(ready3.isEquivalentTo(ready4));
+ assertTrue(ready4.isEquivalentTo(ready5));
+ assertFalse(emptyOk1.isEquivalentTo(ready1));
+ assertFalse(ready1.isEquivalentTo(emptyOk1));
+ }
+
+
+ private static List<Subchannel> getList(SubchannelPicker picker) {
+ return picker instanceof ReadyPicker ? ((ReadyPicker) picker).getList() :
+ Collections.<Subchannel>emptyList();
+ }
private static class FakeSocketAddress extends SocketAddress {
final String name;