diff options
author | Kun Zhang <zhangkun83@users.noreply.github.com> | 2017-09-12 12:25:08 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-12 12:25:08 -0700 |
commit | df92533524ab6d02d1b90370197518b0f2cfde57 (patch) | |
tree | 61e262374bfb04a146c30da9f3d9ede31d2c5ead /grpclb | |
parent | ec600feb87b5da620cda2158bcbcd3c582359878 (diff) | |
download | grpc-grpc-java-df92533524ab6d02d1b90370197518b0f2cfde57.tar.gz |
grpclb: fallback to backend addresses (#3439)
The GRPCLB client will use the backend addresses from resolver if it has not received any server list from any balancer after a certain timeout (10s).
Diffstat (limited to 'grpclb')
4 files changed, 462 insertions, 97 deletions
diff --git a/grpclb/src/main/java/io/grpc/grpclb/BackendAddressGroup.java b/grpclb/src/main/java/io/grpc/grpclb/BackendAddressGroup.java new file mode 100644 index 000000000..8e134b0ed --- /dev/null +++ b/grpclb/src/main/java/io/grpc/grpclb/BackendAddressGroup.java @@ -0,0 +1,42 @@ +/* + * Copyright 2017, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.grpclb; + +import static com.google.common.base.Preconditions.checkNotNull; + +import io.grpc.EquivalentAddressGroup; +import javax.annotation.Nullable; + +final class BackendAddressGroup { + private final EquivalentAddressGroup addresses; + @Nullable + private final String token; + + BackendAddressGroup(EquivalentAddressGroup addresses, @Nullable String token) { + this.addresses = checkNotNull(addresses, "addresses"); + this.token = token; + } + + EquivalentAddressGroup getAddresses() { + return addresses; + } + + @Nullable + String getToken() { + return token; + } +} diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index 0cb851163..2ef6a1901 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -28,8 +28,8 @@ import io.grpc.grpclb.GrpclbConstants.LbPolicy; import io.grpc.internal.LogId; import io.grpc.internal.ObjectPool; import io.grpc.internal.WithLogId; -import java.net.SocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; @@ -112,6 +112,9 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { } } + newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups); + newBackendServers = Collections.unmodifiableList(newBackendServers); + if (!newLbAddressGroups.isEmpty()) { if (newLbPolicy != LbPolicy.GRPCLB) { newLbPolicy = LbPolicy.GRPCLB; @@ -139,7 +142,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { grpclbState.propagateError(Status.UNAVAILABLE.withDescription( "NameResolver returned no LB address while asking for GRPCLB")); } else { - grpclbState.setLbAddress(flattenLbAddressGroups(newLbAddressGroups)); + grpclbState.updateAddresses(newLbAddressGroups, newBackendServers); } break; default: @@ -211,35 +214,4 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId { LbPolicy getLbPolicy() { return lbPolicy; } - - private LbAddressGroup flattenLbAddressGroups(List<LbAddressGroup> groupList) { - assert !groupList.isEmpty(); - List<EquivalentAddressGroup> eags = new ArrayList<EquivalentAddressGroup>(groupList.size()); - String authority = groupList.get(0).getAuthority(); - for (LbAddressGroup group : groupList) { - if (!authority.equals(group.getAuthority())) { - // TODO(ejona): Allow different authorities for different addresses. Requires support from - // Helper. - logger.log(Level.WARNING, - "[{0}] Multiple authorities found for LB. " - + "Skipping addresses for {0} in preference to {1}", - new Object[] {logId, group.getAuthority(), authority}); - } else { - eags.add(group.getAddresses()); - } - } - return new LbAddressGroup(flattenEquivalentAddressGroup(eags), authority); - } - - /** - * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object. - */ - private static EquivalentAddressGroup flattenEquivalentAddressGroup( - List<EquivalentAddressGroup> groupList) { - List<SocketAddress> addrs = new ArrayList<SocketAddress>(); - for (EquivalentAddressGroup group : groupList) { - addrs.addAll(group.getAddresses()); - } - return new EquivalentAddressGroup(addrs); - } } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 39c4368a6..203f055e4 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -46,6 +46,7 @@ import io.grpc.internal.LogId; import io.grpc.stub.StreamObserver; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; @@ -68,12 +69,12 @@ import javax.annotation.concurrent.NotThreadSafe; * GrpclbLoadBalancer switches to GRPCLB mode. Closed and discarded when GrpclbLoadBalancer * switches away from GRPCLB mode. */ -// TODO(zhangkun83): round-robin on the backend list from the resolver if we don't get a server list -// within a configurable timeout. @NotThreadSafe final class GrpclbState { private static final Logger logger = Logger.getLogger(GrpclbState.class.getName()); + static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); + @VisibleForTesting static final PickResult DROP_PICK_RESULT = PickResult.withError(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer")); @@ -84,6 +85,11 @@ final class GrpclbState { public PickResult picked(Metadata headers) { return PickResult.withNoResult(); } + + @Override + public String toString() { + return "BUFFER_ENTRY"; + } }; private final LogId logId; @@ -95,6 +101,13 @@ final class GrpclbState { private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO = Attributes.Key.of("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo"); + // Once set, never go back to null. + @Nullable + private ScheduledFuture<?> fallbackTimer; + private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList(); + private boolean fallbackTimerExpired; + private boolean receivedServerListFromBalancer; + @Nullable private ManagedChannel lbCommChannel; @@ -132,9 +145,11 @@ final class GrpclbState { } /** - * Set the address of the balancer, and create connection if not yet connected. + * Set the new addresses of the balancer and backends, and create connection if not yet connected. */ - void setLbAddress(LbAddressGroup newLbAddressGroup) { + void updateAddresses( + List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers) { + LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups); startLbComm(newLbAddressGroup); // Avoid creating a new RPC just because the addresses were updated, as it can cause a // stampeding herd. The current RPC may be on a connection to an address not present in @@ -143,6 +158,30 @@ final class GrpclbState { if (lbStream == null) { startLbRpc(); } + // If we don't receive server list from the balancer within the timeout, we round-robin on + // the backend list from the resolver (aka fallback), until the balancer returns a server list. + fallbackBackendList = newBackendServers; + if (fallbackTimer == null) { + fallbackTimer = + timerService.schedule(new FallbackModeTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } else { + maybeUseFallbackBackends(); + } + } + + private void maybeUseFallbackBackends() { + // Only use fallback backends after fallback timer expired and before receiving server list from + // the balancer. + if (receivedServerListFromBalancer || !fallbackTimerExpired) { + return; + } + List<DropEntry> newDropList = new ArrayList<DropEntry>(); + List<BackendAddressGroup> newBackendAddrList = new ArrayList<BackendAddressGroup>(); + for (EquivalentAddressGroup eag : fallbackBackendList) { + newDropList.add(null); + newBackendAddrList.add(new BackendAddressGroup(eag, null)); + } + updateRoundRobinLists(newDropList, newBackendAddrList, null); } private void shutdownLbComm() { @@ -179,6 +218,7 @@ final class GrpclbState { checkState(lbStream == null, "previous lbStream has not been cleared yet"); LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel); lbStream = new LbStream(stub); + lbStream.start(); LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder() .setInitialRequest(InitialLoadBalanceRequest.newBuilder() @@ -217,33 +257,106 @@ final class GrpclbState { return lbStream.loadRecorder; } + private void updateRoundRobinLists( + List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList, + @Nullable GrpclbClientLoadRecorder loadRecorder) { + HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap = + new HashMap<EquivalentAddressGroup, Subchannel>(); + List<BackendEntry> newBackendList = new ArrayList<BackendEntry>(); + + for (BackendAddressGroup backendAddr : newBackendAddrList) { + EquivalentAddressGroup eag = backendAddr.getAddresses(); + Subchannel subchannel = newSubchannelMap.get(eag); + if (subchannel == null) { + subchannel = subchannels.get(eag); + if (subchannel == null) { + Attributes subchannelAttrs = Attributes.newBuilder() + .set(STATE_INFO, + new AtomicReference<ConnectivityStateInfo>( + ConnectivityStateInfo.forNonError(IDLE))) + .build(); + subchannel = helper.createSubchannel(eag, subchannelAttrs); + subchannel.requestConnection(); + } + newSubchannelMap.put(eag, subchannel); + } + BackendEntry entry; + // Only picks with tokens are reported to LoadRecorder + if (backendAddr.getToken() == null) { + entry = new BackendEntry(subchannel); + } else { + entry = new BackendEntry(subchannel, loadRecorder, backendAddr.getToken()); + } + newBackendList.add(entry); + } + + // Close Subchannels whose addresses have been delisted + for (Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) { + EquivalentAddressGroup eag = entry.getKey(); + if (!newSubchannelMap.containsKey(eag)) { + entry.getValue().shutdown(); + } + } + + subchannels = Collections.unmodifiableMap(newSubchannelMap); + dropList = Collections.unmodifiableList(newDropList); + backendList = Collections.unmodifiableList(newBackendList); + maybeUpdatePicker(); + } + + @VisibleForTesting + class FallbackModeTask implements Runnable { + @Override + public void run() { + helper.runSerialized(new Runnable() { + @Override + public void run() { + fallbackTimerExpired = true; + maybeUseFallbackBackends(); + } + }); + } + } + + @VisibleForTesting + class LoadReportingTask implements Runnable { + private final LbStream stream; + + LoadReportingTask(LbStream stream) { + this.stream = stream; + } + + @Override + public void run() { + helper.runSerialized(new Runnable() { + @Override + public void run() { + stream.loadReportFuture = null; + stream.sendLoadReport(); + } + }); + } + } + private class LbStream implements StreamObserver<LoadBalanceResponse> { - final StreamObserver<LoadBalanceRequest> lbRequestWriter; final GrpclbClientLoadRecorder loadRecorder; - - final Runnable loadReportRunnable = new Runnable() { - @Override - public void run() { - helper.runSerialized(new Runnable() { - @Override - public void run() { - loadReportTask = null; - sendLoadReport(); - } - }); - } - }; + final LoadBalancerGrpc.LoadBalancerStub stub; + StreamObserver<LoadBalanceRequest> lbRequestWriter; // These fields are only accessed from helper.runSerialized() boolean initialResponseReceived; boolean closed; long loadReportIntervalMillis = -1; - ScheduledFuture<?> loadReportTask; + ScheduledFuture<?> loadReportFuture; LbStream(LoadBalancerGrpc.LoadBalancerStub stub) { + this.stub = checkNotNull(stub, "stub"); // Stats data only valid for current LbStream. We do not carry over data from previous // stream. loadRecorder = new GrpclbClientLoadRecorder(time); + } + + void start() { lbRequestWriter = stub.withWaitForReady().balanceLoad(this); } @@ -294,8 +407,8 @@ final class GrpclbState { private void scheduleNextLoadReport() { if (loadReportIntervalMillis > 0) { - loadReportTask = timerService.schedule( - loadReportRunnable, loadReportIntervalMillis, TimeUnit.MILLISECONDS); + loadReportFuture = timerService.schedule( + new LoadReportingTask(this), loadReportIntervalMillis, TimeUnit.MILLISECONDS); } } @@ -330,12 +443,14 @@ final class GrpclbState { return; } + receivedServerListFromBalancer = true; + if (fallbackTimer != null) { + fallbackTimer.cancel(false); + } // TODO(zhangkun83): handle delegate from initialResponse ServerList serverList = response.getServerList(); - HashMap<EquivalentAddressGroup, Subchannel> newSubchannelMap = - new HashMap<EquivalentAddressGroup, Subchannel>(); List<DropEntry> newDropList = new ArrayList<DropEntry>(); - List<BackendEntry> newBackendList = new ArrayList<BackendEntry>(); + List<BackendAddressGroup> newBackendAddrList = new ArrayList<BackendAddressGroup>(); // Construct the new collections. Create new Subchannels when necessary. for (Server server : serverList.getServersList()) { String token = server.getLoadBalanceToken(); @@ -352,35 +467,10 @@ final class GrpclbState { continue; } EquivalentAddressGroup eag = new EquivalentAddressGroup(address); - Subchannel subchannel = newSubchannelMap.get(eag); - if (subchannel == null) { - subchannel = subchannels.get(eag); - if (subchannel == null) { - Attributes subchannelAttrs = Attributes.newBuilder() - .set(STATE_INFO, - new AtomicReference<ConnectivityStateInfo>( - ConnectivityStateInfo.forNonError(IDLE))) - .build(); - subchannel = helper.createSubchannel(eag, subchannelAttrs); - subchannel.requestConnection(); - } - newSubchannelMap.put(eag, subchannel); - } - newBackendList.add(new BackendEntry(subchannel, loadRecorder, token)); - } - } - // Close Subchannels whose addresses have been delisted - for (Entry<EquivalentAddressGroup, Subchannel> entry : subchannels.entrySet()) { - EquivalentAddressGroup eag = entry.getKey(); - if (!newSubchannelMap.containsKey(eag)) { - entry.getValue().shutdown(); + newBackendAddrList.add(new BackendAddressGroup(eag, token)); } } - - subchannels = Collections.unmodifiableMap(newSubchannelMap); - dropList = Collections.unmodifiableList(newDropList); - backendList = Collections.unmodifiableList(newBackendList); - maybeUpdatePicker(); + updateRoundRobinLists(newDropList, newBackendAddrList, loadRecorder); } private void handleStreamClosed(Status status) { @@ -411,9 +501,9 @@ final class GrpclbState { } private void cleanUp() { - if (loadReportTask != null) { - loadReportTask.cancel(false); - loadReportTask = null; + if (loadReportFuture != null) { + loadReportFuture.cancel(false); + loadReportFuture = null; } if (lbStream == this) { lbStream = null; @@ -479,6 +569,37 @@ final class GrpclbState { helper.updateBalancingState(state, picker); } + private LbAddressGroup flattenLbAddressGroups(List<LbAddressGroup> groupList) { + assert !groupList.isEmpty(); + List<EquivalentAddressGroup> eags = new ArrayList<EquivalentAddressGroup>(groupList.size()); + String authority = groupList.get(0).getAuthority(); + for (LbAddressGroup group : groupList) { + if (!authority.equals(group.getAuthority())) { + // TODO(ejona): Allow different authorities for different addresses. Requires support from + // Helper. + logger.log(Level.WARNING, + "[{0}] Multiple authorities found for LB. " + + "Skipping addresses for {0} in preference to {1}", + new Object[] {logId, group.getAuthority(), authority}); + } else { + eags.add(group.getAddresses()); + } + } + return new LbAddressGroup(flattenEquivalentAddressGroup(eags), authority); + } + + /** + * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object. + */ + private static EquivalentAddressGroup flattenEquivalentAddressGroup( + List<EquivalentAddressGroup> groupList) { + List<SocketAddress> addrs = new ArrayList<SocketAddress>(); + for (EquivalentAddressGroup group : groupList) { + addrs.addAll(group.getAddresses()); + } + return new EquivalentAddressGroup(addrs); + } + @VisibleForTesting static final class DropEntry { private final GrpclbClientLoadRecorder loadRecorder; @@ -525,19 +646,35 @@ final class GrpclbState { static final class BackendEntry implements RoundRobinEntry { @VisibleForTesting final PickResult result; + @Nullable private final GrpclbClientLoadRecorder loadRecorder; + @Nullable private final String token; + /** + * Creates a BackendEntry whose usage will be reported to load recorder. + */ BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) { this.result = PickResult.withSubchannel(subchannel, loadRecorder); this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder"); this.token = checkNotNull(token, "token"); } + /** + * Creates a BackendEntry whose usage will not be reported. + */ + BackendEntry(Subchannel subchannel) { + this.result = PickResult.withSubchannel(subchannel); + this.loadRecorder = null; + this.token = null; + } + @Override public PickResult picked(Metadata headers) { headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY); - headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token); + if (token != null) { + headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token); + } return result; } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index c57d38f3a..23c2efb51 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -82,6 +82,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -101,6 +102,20 @@ public class GrpclbLoadBalancerTest { private static final Attributes.Key<String> RESOLUTION_ATTR = Attributes.Key.of("resolution-attr"); private static final String SERVICE_AUTHORITY = "api.google.com"; + private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER = + new FakeClock.TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command instanceof GrpclbState.LoadReportingTask; + } + }; + private static final FakeClock.TaskFilter FALLBACK_MODE_TASK_FILTER = + new FakeClock.TaskFilter() { + @Override + public boolean shouldAccept(Runnable command) { + return command instanceof GrpclbState.FallbackModeTask; + } + }; @Mock private Helper helper; @@ -356,11 +371,11 @@ public class GrpclbLoadBalancerTest { .build())); // Simulate receiving LB response - assertEquals(0, fakeClock.numPendingTasks()); + assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // Load reporting task is scheduled - assertEquals(1, fakeClock.numPendingTasks()); + assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); assertEquals(0, fakeClock.runDueTasks()); List<ServerEntry> backends = Arrays.asList( @@ -569,18 +584,19 @@ public class GrpclbLoadBalancerTest { StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); // Simulate LB initial response - assertEquals(0, fakeClock.numPendingTasks()); + assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); lbResponseObserver.onNext(buildInitialResponse(1983)); // Load reporting task is scheduled - assertEquals(1, fakeClock.numPendingTasks()); + assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next(); assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); // Simulate an abundant LB initial response, with a different report interval lbResponseObserver.onNext(buildInitialResponse(9097)); // It doesn't affect load-reporting at all - assertThat(fakeClock.getPendingTasks()).containsExactly(scheduledTask); + assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER)) + .containsExactly(scheduledTask); assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); } @@ -607,11 +623,11 @@ public class GrpclbLoadBalancerTest { .build())); // Simulate receiving LB response - assertEquals(0, fakeClock.numPendingTasks()); + assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); lbResponseObserver.onNext(buildInitialResponse(1983)); // Load reporting task is scheduled - assertEquals(1, fakeClock.numPendingTasks()); + assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); FakeClock.ScheduledTask scheduledTask = fakeClock.getPendingTasks().iterator().next(); assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS)); @@ -619,14 +635,14 @@ public class GrpclbLoadBalancerTest { lbResponseObserver.onCompleted(); // Reporting task cancelled - assertEquals(0, fakeClock.numPendingTasks()); + assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); // Simulate a race condition where the task has just started when its cancelled scheduledTask.command.run(); // No report sent. No new task scheduled inOrder.verify(lbRequestObserver, never()).onNext(any(LoadBalanceRequest.class)); - assertEquals(0, fakeClock.numPendingTasks()); + assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); } private void assertNextReport( @@ -1001,6 +1017,9 @@ public class GrpclbLoadBalancerTest { InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); + // Timer for fallback mode is registered + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + // Simulate receiving LB response List<ServerEntry> backends1 = Arrays.asList( new ServerEntry("127.0.0.1", 2000, "token0001"), @@ -1014,6 +1033,8 @@ public class GrpclbLoadBalancerTest { eq(new EquivalentAddressGroup(backends1.get(0).addr)), any(Attributes.class)); inOrder.verify(helper).createSubchannel( eq(new EquivalentAddressGroup(backends1.get(1).addr)), any(Attributes.class)); + // Timer for fallback mode is cancelled as soon as the balancer returns a server list + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); assertEquals(2, mockSubchannels.size()); Subchannel subchannel1 = mockSubchannels.poll(); Subchannel subchannel2 = mockSubchannels.poll(); @@ -1162,6 +1183,199 @@ public class GrpclbLoadBalancerTest { } @Test + public void grpclbFallbackToBackendsFromResolver() { + long loadReportIntervalMillis = 1983; + InOrder helperInOrder = inOrder(helper); + + // Create a resolution list with a mixture of balancer and backend addresses + List<EquivalentAddressGroup> resolutionList = + createResolvedServerAddresses(false, true, false); + Attributes resolutionAttrs = Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + deliverResolvedAddresses(resolutionList, resolutionAttrs); + + assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); + helperInOrder.verify(helper).createOobChannel( + addrsEq(resolutionList.get(1)), eq(lbAuthority(0))); + + // Attempted to connect to balancer + assertEquals(1, fakeOobChannels.size()); + ManagedChannel oobChannel = fakeOobChannels.poll(); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll(); + + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + // Receiving the initial response won't reset the fallback timer. Only reciving the server list + // does. + lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); + // We don't care if runSerialized() has been run. + helperInOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); + helperInOrder.verifyNoMoreInteractions(); + + //////////////////////////// + // Fallback timer expires + //////////////////////////// + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS); + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + + // Fall back to the backends from resolver + fallbackTestVerifyUseOfFallbackBackendLists( + helperInOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); + + assertNull(balancer.getDelegate()); + assertFalse(oobChannel.isShutdown()); + verify(lbRequestObserver, never()).onCompleted(); + + //////////////////////////////////////////////////////// + // Name resolver sends new list without any backend addr + //////////////////////////////////////////////////////// + resolutionList = createResolvedServerAddresses(true, true); + deliverResolvedAddresses(resolutionList, resolutionAttrs); + assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); + + // New addresses are updated to the OobChannel + helperInOrder.verify(helper).updateOobChannelAddresses( + same(oobChannel), + eq(new EquivalentAddressGroup( + Arrays.asList( + resolutionList.get(0).getAddresses().get(0), + resolutionList.get(1).getAddresses().get(0))))); + + // Still in fallback logic, except that the backend list is empty + fallbackTestVerifyUseOfFallbackBackendLists( + helperInOrder, helper, Collections.<EquivalentAddressGroup>emptyList()); + + ////////////////////////////////////////////////// + // Name resolver sends new list with backend addrs + ////////////////////////////////////////////////// + resolutionList = createResolvedServerAddresses(true, false, false); + deliverResolvedAddresses(resolutionList, resolutionAttrs); + assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); + + // New LB address is updated to the OobChannel + helperInOrder.verify(helper).updateOobChannelAddresses( + same(oobChannel), + addrsEq(resolutionList.get(0))); + + // New backend addresses are used for fallback + fallbackTestVerifyUseOfFallbackBackendLists( + helperInOrder, helper, Arrays.asList(resolutionList.get(1), resolutionList.get(2))); + + /////////////////////// + // Break the LB stream + /////////////////////// + lbResponseObserver.onError(Status.UNAVAILABLE.asException()); + + // The error will NOT propagate to picker because fallback list is in use. + helperInOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + + // A new stream is created + verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + + ///////////////////////////////// + // Balancer returns a server list + ///////////////////////////////// + List<ServerEntry> serverList = Arrays.asList( + new ServerEntry("127.0.0.1", 2000, "token0001"), + new ServerEntry("127.0.0.1", 2010, "token0002")); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(serverList)); + + // Fallback mode ends + fallbackTestVerifyUseOfBalancerBackendLists(helperInOrder, helper, serverList); + + /////////////////////////////////////////////////////////////// + // New backend addresses from resolver outside of fallback mode + /////////////////////////////////////////////////////////////// + resolutionList = createResolvedServerAddresses(true, false); + deliverResolvedAddresses(resolutionList, resolutionAttrs); + assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); + // Will not affect the round robin list at all + helperInOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + + // Fallback mode is one-shot only. + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + } + + private void fallbackTestVerifyUseOfFallbackBackendLists( + InOrder inOrder, Helper helper, List<EquivalentAddressGroup> addrs) { + fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, null); + } + + private void fallbackTestVerifyUseOfBalancerBackendLists( + InOrder inOrder, Helper helper, List<ServerEntry> servers) { + ArrayList<EquivalentAddressGroup> addrs = new ArrayList<EquivalentAddressGroup>(); + ArrayList<String> tokens = new ArrayList<String>(); + for (ServerEntry server : servers) { + addrs.add(new EquivalentAddressGroup(server.addr)); + tokens.add(server.token); + } + fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, tokens); + } + + private void fallbackTestVerifyUseOfBackendLists( + InOrder inOrder, Helper helper, List<EquivalentAddressGroup> addrs, + @Nullable List<String> tokens) { + if (tokens != null) { + assertEquals(addrs.size(), tokens.size()); + } + for (EquivalentAddressGroup addr : addrs) { + inOrder.verify(helper).createSubchannel(addrsEq(addr), any(Attributes.class)); + } + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); + assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY); + assertEquals(addrs.size(), mockSubchannels.size()); + ArrayList<Subchannel> subchannels = new ArrayList<Subchannel>(mockSubchannels); + mockSubchannels.clear(); + for (Subchannel subchannel : subchannels) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); + } + inOrder.verify(helper, atLeast(0)) + .updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + + ArrayList<BackendEntry> pickList = new ArrayList<BackendEntry>(); + for (int i = 0; i < addrs.size(); i++) { + Subchannel subchannel = subchannels.get(i); + BackendEntry backend; + if (tokens == null) { + backend = new BackendEntry(subchannel); + } else { + backend = new BackendEntry(subchannel, getLoadRecorder(), tokens.get(i)); + } + pickList.add(backend); + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList) + .containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); + assertThat(picker.pickList).containsExactlyElementsIn(pickList); + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + } + } + + @Test public void grpclbMultipleAuthorities() throws Exception { List<EquivalentAddressGroup> grpclbResolutionList = Arrays.asList( new EquivalentAddressGroup( |