diff options
author | Carl Mastrangelo <notcarl@google.com> | 2018-07-24 13:00:49 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-07-24 13:00:49 -0700 |
commit | 72179e22a57865264ac6e1838e0a9d46f96d0556 (patch) | |
tree | de57de4fb47c835145bcc815d7b3952296a1f17c /grpclb | |
parent | 8da06a8bc402a8963b2840dfe1967be2d13b4193 (diff) | |
download | grpc-grpc-java-72179e22a57865264ac6e1838e0a9d46f96d0556.tar.gz |
grpclb: remove unnecessary support for lb delegation
Diffstat (limited to 'grpclb')
4 files changed, 33 insertions, 383 deletions
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java index ebc2e3d7a..87e2b6bc6 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbConstants.java @@ -16,7 +16,6 @@ package io.grpc.grpclb; -import io.grpc.Attributes; import io.grpc.ExperimentalApi; import io.grpc.Metadata; @@ -25,20 +24,6 @@ import io.grpc.Metadata; */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1782") public final class GrpclbConstants { - /** - * The load-balancing policy designated by the naming system. - */ - public enum LbPolicy { - PICK_FIRST, - ROUND_ROBIN, - GRPCLB - } - - /** - * An attribute of a name resolution result, designating the LB policy. - */ - public static final Attributes.Key<LbPolicy> ATTR_LB_POLICY = - Attributes.Key.create("io.grpc.grpclb.lbPolicy"); /** * The opaque token given by the remote balancer for each returned server address. The client diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index 24e870fb4..f6127a916 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -24,7 +24,6 @@ import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.Status; -import io.grpc.grpclb.GrpclbConstants.LbPolicy; import io.grpc.internal.BackoffPolicy; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.LogId; @@ -35,8 +34,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledExecutorService; -import java.util.logging.Level; -import java.util.logging.Logger; + import javax.annotation.Nullable; /** @@ -46,46 +44,32 @@ import javax.annotation.Nullable; * or round-robin balancer. */ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { - private static final Logger logger = Logger.getLogger(GrpclbLoadBalancer.class.getName()); private final LogId logId = LogId.allocate(getClass().getName()); - - private final Helper helper; private final SubchannelPool subchannelPool; - private final Factory pickFirstBalancerFactory; - private final Factory roundRobinBalancerFactory; private final ObjectPool<ScheduledExecutorService> timerServicePool; - private final TimeProvider time; - private final BackoffPolicy.Provider backoffPolicyProvider; // All mutable states in this class are mutated ONLY from Channel Executor - private ScheduledExecutorService timerService; - // If not null, all work is delegated to it. - @Nullable - private LoadBalancer delegate; - private LbPolicy lbPolicy; - - // Null if lbPolicy != GRPCLB @Nullable private GrpclbState grpclbState; - GrpclbLoadBalancer(Helper helper, SubchannelPool subchannelPool, Factory pickFirstBalancerFactory, - Factory roundRobinBalancerFactory, ObjectPool<ScheduledExecutorService> timerServicePool, - TimeProvider time, BackoffPolicy.Provider backoffPolicyProvider) { - this.helper = checkNotNull(helper, "helper"); - this.pickFirstBalancerFactory = - checkNotNull(pickFirstBalancerFactory, "pickFirstBalancerFactory"); - this.roundRobinBalancerFactory = - checkNotNull(roundRobinBalancerFactory, "roundRobinBalancerFactory"); + GrpclbLoadBalancer( + Helper helper, + SubchannelPool subchannelPool, + ObjectPool<ScheduledExecutorService> timerServicePool, + TimeProvider time, + BackoffPolicy.Provider backoffPolicyProvider) { + checkNotNull(helper, "helper"); this.timerServicePool = checkNotNull(timerServicePool, "timerServicePool"); this.timerService = checkNotNull(timerServicePool.getObject(), "timerService"); - this.time = checkNotNull(time, "time provider"); - this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); + checkNotNull(time, "time provider"); + checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool"); this.subchannelPool.init(helper, timerService); - setLbPolicy(LbPolicy.GRPCLB); + grpclbState = + new GrpclbState(helper, subchannelPool, time, timerService, backoffPolicyProvider, logId); } @Override @@ -95,19 +79,14 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { @Override public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { - if (delegate != null) { - delegate.handleSubchannelState(subchannel, newState); - return; - } - if (grpclbState != null) { - grpclbState.handleSubchannelState(subchannel, newState); - } + // grpclbState should never be null here since handleSubchannelState cannot be called while the + // lb is shutdown. + grpclbState.handleSubchannelState(subchannel, newState); } @Override public void handleResolvedAddressGroups( List<EquivalentAddressGroup> updatedServers, Attributes attributes) { - LbPolicy newLbPolicy = attributes.get(GrpclbConstants.ATTR_LB_POLICY); // LB addresses and backend addresses are treated separately List<LbAddressGroup> newLbAddressGroups = new ArrayList<LbAddressGroup>(); List<EquivalentAddressGroup> newBackendServers = new ArrayList<EquivalentAddressGroup>(); @@ -122,65 +101,10 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups); newBackendServers = Collections.unmodifiableList(newBackendServers); - - if (!newLbAddressGroups.isEmpty()) { - if (newLbPolicy != LbPolicy.GRPCLB) { - newLbPolicy = LbPolicy.GRPCLB; - logger.log( - Level.FINE, "[{0}] Switching to GRPCLB because there is at least one balancer", logId); - } - } - if (newLbPolicy == null) { - logger.log(Level.FINE, "[{0}] New config missing policy. Using PICK_FIRST", logId); - newLbPolicy = LbPolicy.PICK_FIRST; - } - - // Switch LB policy if requested - setLbPolicy(newLbPolicy); - - // Consume the new addresses - switch (lbPolicy) { - case PICK_FIRST: - case ROUND_ROBIN: - checkNotNull(delegate, "delegate should not be null. newLbPolicy=" + newLbPolicy); - delegate.handleResolvedAddressGroups(newBackendServers, attributes); - break; - case GRPCLB: - grpclbState.handleAddresses(newLbAddressGroups, newBackendServers); - break; - default: - // Do nothing - } - } - - private void setLbPolicy(LbPolicy newLbPolicy) { - if (newLbPolicy != lbPolicy) { - resetStates(); - switch (newLbPolicy) { - case PICK_FIRST: - delegate = checkNotNull(pickFirstBalancerFactory.newLoadBalancer(helper), - "pickFirstBalancerFactory.newLoadBalancer()"); - break; - case ROUND_ROBIN: - delegate = checkNotNull(roundRobinBalancerFactory.newLoadBalancer(helper), - "roundRobinBalancerFactory.newLoadBalancer()"); - break; - case GRPCLB: - grpclbState = new GrpclbState( - helper, subchannelPool, time, timerService, backoffPolicyProvider, logId); - break; - default: - // Do nohting - } - } - lbPolicy = newLbPolicy; + grpclbState.handleAddresses(newLbAddressGroups, newBackendServers); } private void resetStates() { - if (delegate != null) { - delegate.shutdown(); - delegate = null; - } if (grpclbState != null) { grpclbState.shutdown(); grpclbState = null; @@ -195,9 +119,6 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { @Override public void handleNameResolutionError(Status error) { - if (delegate != null) { - delegate.handleNameResolutionError(error); - } if (grpclbState != null) { grpclbState.propagateError(error); } @@ -208,14 +129,4 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { GrpclbState getGrpclbState() { return grpclbState; } - - @VisibleForTesting - LoadBalancer getDelegate() { - return delegate; - } - - @VisibleForTesting - LbPolicy getLbPolicy() { - return lbPolicy; - } } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java index 00159b480..e46095f5d 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancerFactory.java @@ -18,12 +18,10 @@ package io.grpc.grpclb; import io.grpc.ExperimentalApi; import io.grpc.LoadBalancer; -import io.grpc.PickFirstBalancerFactory; import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TimeProvider; -import io.grpc.util.RoundRobinLoadBalancerFactory; /** * A factory for {@link LoadBalancer}s that uses the GRPCLB protocol. @@ -46,8 +44,7 @@ public class GrpclbLoadBalancerFactory extends LoadBalancer.Factory { @Override public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { return new GrpclbLoadBalancer( - helper, new CachedSubchannelPool(), PickFirstBalancerFactory.getInstance(), - RoundRobinLoadBalancerFactory.getInstance(), + helper, new CachedSubchannelPool(), // TODO(zhangkun83): balancer sends load reporting RPCs from it, which also involves // channelExecutor thus may also run other tasks queued in the channelExecutor. If such // load should not be on the shared scheduled executor, we should use a combination of the diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index ff4b8eb11..322ba6d53 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -54,7 +54,6 @@ import io.grpc.ClientStreamTracer; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; -import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; @@ -63,7 +62,6 @@ import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Status; -import io.grpc.grpclb.GrpclbConstants.LbPolicy; import io.grpc.grpclb.GrpclbState.BackendEntry; import io.grpc.grpclb.GrpclbState.DropEntry; import io.grpc.grpclb.GrpclbState.ErrorEntry; @@ -158,14 +156,6 @@ public class GrpclbLoadBalancerTest { private final SerializingExecutor channelExecutor = new SerializingExecutor(MoreExecutors.directExecutor()); @Mock - private LoadBalancer.Factory pickFirstBalancerFactory; - @Mock - private LoadBalancer pickFirstBalancer; - @Mock - private LoadBalancer.Factory roundRobinBalancerFactory; - @Mock - private LoadBalancer roundRobinBalancer; - @Mock private ObjectPool<ScheduledExecutorService> timerServicePool; @Mock private BackoffPolicy.Provider backoffPolicyProvider; @@ -179,10 +169,6 @@ public class GrpclbLoadBalancerTest { @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - when(pickFirstBalancerFactory.newLoadBalancer(any(Helper.class))) - .thenReturn(pickFirstBalancer); - when(roundRobinBalancerFactory.newLoadBalancer(any(Helper.class))) - .thenReturn(roundRobinBalancer); mockLbService = mock(LoadBalancerGrpc.LoadBalancerImplBase.class, delegatesTo( new LoadBalancerGrpc.LoadBalancerImplBase() { @Override @@ -258,9 +244,11 @@ public class GrpclbLoadBalancerTest { when(backoffPolicy2.nextBackoffNanos()).thenReturn(10L, 100L); when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); balancer = new GrpclbLoadBalancer( - helper, subchannelPool, pickFirstBalancerFactory, roundRobinBalancerFactory, + helper, + subchannelPool, timerServicePool, - timeProvider, backoffPolicyProvider); + timeProvider, + backoffPolicyProvider); verify(subchannelPool).init(same(helper), same(timerService)); } @@ -399,8 +387,7 @@ public class GrpclbLoadBalancerTest { long loadReportIntervalMillis = 1983; List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); - Attributes grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes grpclbResolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); // Fallback timer is started as soon as address is resolved. @@ -625,8 +612,7 @@ public class GrpclbLoadBalancerTest { when(args.getHeaders()).thenReturn(headers); List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); - Attributes grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes grpclbResolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -656,8 +642,7 @@ public class GrpclbLoadBalancerTest { when(args.getHeaders()).thenReturn(headers); List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); - Attributes grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes grpclbResolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -734,11 +719,6 @@ public class GrpclbLoadBalancerTest { Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build(); deliverResolvedAddresses(resolvedServers, resolutionAttrs); - - verify(pickFirstBalancerFactory).newLoadBalancer(helper); - verify(pickFirstBalancer).handleResolvedAddressGroups(eq(resolvedServers), eq(resolutionAttrs)); - verifyNoMoreInteractions(roundRobinBalancerFactory); - verifyNoMoreInteractions(roundRobinBalancer); } @Test @@ -754,62 +734,11 @@ public class GrpclbLoadBalancerTest { List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(true); EquivalentAddressGroup eag = resolvedServers.get(0); - Attributes resolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes resolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(resolvedServers, resolutionAttrs); - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); - assertNull(balancer.getDelegate()); verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0))); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - - verifyNoMoreInteractions(pickFirstBalancerFactory); - verifyNoMoreInteractions(pickFirstBalancer); - verifyNoMoreInteractions(roundRobinBalancerFactory); - verifyNoMoreInteractions(roundRobinBalancer); - } - - @Test - public void delegatingPickFirstThenNameResolutionFails() { - List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false); - - Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build(); - deliverResolvedAddresses(resolvedServers, resolutionAttrs); - - verify(pickFirstBalancerFactory).newLoadBalancer(helper); - verify(pickFirstBalancer).handleResolvedAddressGroups(eq(resolvedServers), eq(resolutionAttrs)); - - // Then let name resolution fail. The error will be passed directly to the delegate. - Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); - deliverNameResolutionError(error); - verify(pickFirstBalancer).handleNameResolutionError(error); - verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - verifyNoMoreInteractions(roundRobinBalancerFactory); - verifyNoMoreInteractions(roundRobinBalancer); - } - - @Test - public void delegatingRoundRobinThenNameResolutionFails() { - List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false, false); - - Attributes resolutionAttrs = Attributes.newBuilder() - .set(RESOLUTION_ATTR, "yeah") - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN) - .build(); - deliverResolvedAddresses(resolvedServers, resolutionAttrs); - - verify(roundRobinBalancerFactory).newLoadBalancer(helper); - verify(roundRobinBalancer).handleResolvedAddressGroups(resolvedServers, resolutionAttrs); - - // Then let name resolution fail. The error will be passed directly to the delegate. - Status error = Status.NOT_FOUND.withDescription("www.google.com not found"); - deliverNameResolutionError(error); - verify(roundRobinBalancer).handleNameResolutionError(error); - verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - verifyNoMoreInteractions(pickFirstBalancerFactory); - verifyNoMoreInteractions(pickFirstBalancer); } @Test @@ -817,12 +746,9 @@ public class GrpclbLoadBalancerTest { InOrder inOrder = inOrder(helper, subchannelPool); // Go to GRPCLB first List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); - Attributes grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes grpclbResolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); - assertNull(balancer.getDelegate()); verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); @@ -856,165 +782,13 @@ public class GrpclbLoadBalancerTest { any(Attributes.class)); } - @SuppressWarnings("unchecked") - @Test - public void switchPolicy() { - // Go to GRPCLB first - List<EquivalentAddressGroup> grpclbResolutionList = - createResolvedServerAddresses(true, false); - Attributes grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); - - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); - assertNull(balancer.getDelegate()); - verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); - assertEquals(1, fakeOobChannels.size()); - ManagedChannel oobChannel = fakeOobChannels.poll(); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - - // Switch to PICK_FIRST - List<EquivalentAddressGroup> pickFirstResolutionList = - createResolvedServerAddresses(false, false); - Attributes pickFirstResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.PICK_FIRST).build(); - verify(pickFirstBalancerFactory, never()).newLoadBalancer(any(Helper.class)); - assertEquals(1, lbRequestObservers.size()); - StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); - - verify(lbRequestObserver, never()).onCompleted(); - assertFalse(oobChannel.isShutdown()); - deliverResolvedAddresses(pickFirstResolutionList, pickFirstResolutionAttrs); - - verify(pickFirstBalancerFactory).newLoadBalancer(same(helper)); - // Only non-LB addresses are passed to the delegate - verify(pickFirstBalancer).handleResolvedAddressGroups( - eq(pickFirstResolutionList), same(pickFirstResolutionAttrs)); - assertSame(LbPolicy.PICK_FIRST, balancer.getLbPolicy()); - assertSame(pickFirstBalancer, balancer.getDelegate()); - // GRPCLB connection is closed - verify(lbRequestObserver).onCompleted(); - assertTrue(oobChannel.isShutdown()); - // Switching away from GRPCLB will clear the subchannelPool - verify(subchannelPool).clear(); - - // Switch to ROUND_ROBIN - List<EquivalentAddressGroup> roundRobinResolutionList = - createResolvedServerAddresses(false, false, false); - Attributes roundRobinResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN).build(); - verify(roundRobinBalancerFactory, never()).newLoadBalancer(any(Helper.class)); - deliverResolvedAddresses(roundRobinResolutionList, roundRobinResolutionAttrs); - - verify(roundRobinBalancerFactory).newLoadBalancer(same(helper)); - // Only non-LB addresses are passed to the delegate - verify(roundRobinBalancer).handleResolvedAddressGroups( - eq(roundRobinResolutionList), same(roundRobinResolutionAttrs)); - assertSame(LbPolicy.ROUND_ROBIN, balancer.getLbPolicy()); - assertSame(roundRobinBalancer, balancer.getDelegate()); - - // Special case: if at least one address is loadbalancer, use GRPCLB no matter what the - // NameResolver says. - grpclbResolutionList = createResolvedServerAddresses(true, false, true, false); - grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.PICK_FIRST).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); - - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); - assertNull(balancer.getDelegate()); - EquivalentAddressGroup combinedEag = new EquivalentAddressGroup( - Arrays.asList( - grpclbResolutionList.get(0).getAddresses().get(0), - grpclbResolutionList.get(2).getAddresses().get(0)), - lbAttributes(lbAuthority(0))); - verify(helper).createOobChannel(eq(combinedEag), eq(lbAuthority(0))); - assertEquals(1, fakeOobChannels.size()); - oobChannel = fakeOobChannels.poll(); - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); - - // Special case: PICK_FIRST is the default - pickFirstResolutionList = createResolvedServerAddresses(false, false, false); - pickFirstResolutionAttrs = Attributes.EMPTY; - verify(pickFirstBalancerFactory).newLoadBalancer(any(Helper.class)); - assertFalse(oobChannel.isShutdown()); - deliverResolvedAddresses(pickFirstResolutionList, pickFirstResolutionAttrs); - - verify(pickFirstBalancerFactory, times(2)).newLoadBalancer(same(helper)); - // Only non-LB addresses are passed to the delegate - verify(pickFirstBalancer).handleResolvedAddressGroups( - eq(pickFirstResolutionList), same(pickFirstResolutionAttrs)); - assertSame(LbPolicy.PICK_FIRST, balancer.getLbPolicy()); - assertSame(pickFirstBalancer, balancer.getDelegate()); - // GRPCLB connection is closed - assertTrue(oobChannel.isShutdown()); - // Switching away from GRPCLB will clear the subchannelPool - verify(subchannelPool, times(2)).clear(); - } - - @Test - public void resetGrpclbWhenSwitchingAwayFromGrpclb() { - InOrder inOrder = inOrder(helper, subchannelPool); - List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); - Attributes grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); - deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); - - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); - assertNull(balancer.getDelegate()); - verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); - assertEquals(1, fakeOobChannels.size()); - ManagedChannel oobChannel = fakeOobChannels.poll(); - verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); - StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - - // Simulate receiving LB response - List<ServerEntry> backends = Arrays.asList(new ServerEntry("127.0.0.1", 2000, "token0001")); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - lbResponseObserver.onNext(buildInitialResponse()); - lbResponseObserver.onNext(buildLbResponse(backends)); - - inOrder.verify(subchannelPool).takeOrCreateSubchannel( - eq(new EquivalentAddressGroup(backends.get(0).addr, LB_BACKEND_ATTRS)), - any(Attributes.class)); - assertEquals(1, mockSubchannels.size()); - Subchannel subchannel = mockSubchannels.poll(); - verify(subchannel).requestConnection(); - - // Switch to round-robin. GRPCLB streams and connections should be closed. - List<EquivalentAddressGroup> roundRobinResolutionList = - createResolvedServerAddresses(false, false, false); - Attributes roundRobinResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN).build(); - verify(lbRequestObserver, never()).onCompleted(); - verify(subchannelPool, never()).returnSubchannel(same(subchannel)); - assertFalse(oobChannel.isShutdown()); - deliverResolvedAddresses(roundRobinResolutionList, roundRobinResolutionAttrs); - - verify(lbRequestObserver).onCompleted(); - verify(subchannelPool).returnSubchannel(same(subchannel)); - assertTrue(oobChannel.isShutdown()); - assertTrue(oobChannel.isTerminated()); - assertSame(LbPolicy.ROUND_ROBIN, balancer.getLbPolicy()); - assertSame(roundRobinBalancer, balancer.getDelegate()); - assertNull(balancer.getGrpclbState()); - } - @Test public void grpclbUpdatedAddresses_avoidsReconnect() { List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true, false); - Attributes grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes grpclbResolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); ManagedChannel oobChannel = fakeOobChannels.poll(); assertEquals(1, lbRequestObservers.size()); @@ -1034,11 +808,9 @@ public class GrpclbLoadBalancerTest { public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() { List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true, false); - Attributes grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes grpclbResolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); ManagedChannel oobChannel = fakeOobChannels.poll(); assertEquals(1, lbRequestObservers.size()); @@ -1058,15 +830,12 @@ public class GrpclbLoadBalancerTest { public void grpclbWorking() { InOrder inOrder = inOrder(helper, subchannelPool); List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); - Attributes grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes grpclbResolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); // Fallback timer is started as soon as the addresses are resolved. assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); - assertNull(balancer.getDelegate()); verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); assertEquals(1, fakeOobChannels.size()); ManagedChannel oobChannel = fakeOobChannels.poll(); @@ -1273,11 +1042,9 @@ public class GrpclbLoadBalancerTest { // Create a resolution list with a mixture of balancer and backend addresses List<EquivalentAddressGroup> resolutionList = createResolvedServerAddresses(false, true, false); - Attributes resolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes resolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(resolutionList, resolutionAttrs); - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); // Attempted to connect to balancer @@ -1334,7 +1101,6 @@ public class GrpclbLoadBalancerTest { fallbackTestVerifyUseOfFallbackBackendLists( inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); - assertNull(balancer.getDelegate()); assertFalse(oobChannel.isShutdown()); verify(lbRequestObserver, never()).onCompleted(); } @@ -1344,7 +1110,6 @@ public class GrpclbLoadBalancerTest { //////////////////////////////////////////////////////// resolutionList = createResolvedServerAddresses(true, true); deliverResolvedAddresses(resolutionList, resolutionAttrs); - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); // New addresses are updated to the OobChannel inOrder.verify(helper).updateOobChannelAddresses( @@ -1366,7 +1131,6 @@ public class GrpclbLoadBalancerTest { ////////////////////////////////////////////////// resolutionList = createResolvedServerAddresses(true, false, false); deliverResolvedAddresses(resolutionList, resolutionAttrs); - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); // New LB address is updated to the OobChannel inOrder.verify(helper).updateOobChannelAddresses( @@ -1416,7 +1180,6 @@ public class GrpclbLoadBalancerTest { /////////////////////////////////////////////////////////////// resolutionList = createResolvedServerAddresses(true, false); deliverResolvedAddresses(resolutionList, resolutionAttrs); - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); // Will not affect the round robin list at all inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); @@ -1449,11 +1212,9 @@ public class GrpclbLoadBalancerTest { // Create a resolution list with a mixture of balancer and backend addresses List<EquivalentAddressGroup> resolutionList = createResolvedServerAddresses(false, true, false); - Attributes resolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes resolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(resolutionList, resolutionAttrs); - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); // Attempted to connect to balancer @@ -1609,12 +1370,9 @@ public class GrpclbLoadBalancerTest { new FakeSocketAddress("fake-address-3")), lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day - Attributes grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes grpclbResolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); - assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); - assertNull(balancer.getDelegate()); verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1"); } @@ -1628,8 +1386,7 @@ public class GrpclbLoadBalancerTest { InOrder inOrder = inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true); - Attributes grpclbResolutionAttrs = Attributes.newBuilder() - .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + Attributes grpclbResolutionAttrs = Attributes.EMPTY; deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); assertEquals(1, fakeOobChannels.size()); |