diff options
author | Daniel Liu <danielztliu@google.com> | 2023-07-11 13:29:31 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-07-11 13:29:31 -0700 |
commit | 4fa2814d65b2536aede30e1f24c461a2f42be1f7 (patch) | |
tree | ca46c135cf87eede68f82a7356f94d2e0bb5cb40 | |
parent | 8dbd47ceb5e9d1140186bb37a767a3a3eb817af3 (diff) | |
download | grpc-grpc-java-4fa2814d65b2536aede30e1f24c461a2f42be1f7.tar.gz |
services, xds, orca: LRS named metrics support (#10282)
Implements gRFC A64: xDS LRS Custom Metrics Support
17 files changed, 343 insertions, 58 deletions
diff --git a/services/src/main/java/io/grpc/services/CallMetricRecorder.java b/services/src/main/java/io/grpc/services/CallMetricRecorder.java index f491fb754..d480f0f4c 100644 --- a/services/src/main/java/io/grpc/services/CallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/CallMetricRecorder.java @@ -41,6 +41,8 @@ public final class CallMetricRecorder { new AtomicReference<>(); private final AtomicReference<ConcurrentHashMap<String, Double>> requestCostMetrics = new AtomicReference<>(); + private final AtomicReference<ConcurrentHashMap<String, Double>> namedMetrics = + new AtomicReference<>(); private double cpuUtilizationMetric = 0; private double applicationUtilizationMetric = 0; private double memoryUtilizationMetric = 0; @@ -128,6 +130,27 @@ public final class CallMetricRecorder { } /** + * Records an application-specific opaque custom metric measurement. If RPC has already finished, + * this method is no-op. + * + * <p>A latter record will overwrite its former name-sakes. + * + * @return this recorder object + */ + public CallMetricRecorder recordNamedMetric(String name, double value) { + if (disabled) { + return this; + } + if (namedMetrics.get() == null) { + // The chance of race of creation of the map should be very small, so it should be fine + // to create these maps that might be discarded. + namedMetrics.compareAndSet(null, new ConcurrentHashMap<String, Double>()); + } + namedMetrics.get().put(name, value); + return this; + } + + /** * Records a call metric measurement for CPU utilization in the range [0, inf). Values outside the * valid range are ignored. If RPC has already finished, this method is no-op. * @@ -235,12 +258,17 @@ public final class CallMetricRecorder { MetricReport finalizeAndDump2() { Map<String, Double> savedRequestCostMetrics = finalizeAndDump(); Map<String, Double> savedUtilizationMetrics = utilizationMetrics.get(); + Map<String, Double> savedNamedMetrics = namedMetrics.get(); if (savedUtilizationMetrics == null) { savedUtilizationMetrics = Collections.emptyMap(); } + if (savedNamedMetrics == null) { + savedNamedMetrics = Collections.emptyMap(); + } return new MetricReport(cpuUtilizationMetric, applicationUtilizationMetric, memoryUtilizationMetric, qps, eps, Collections.unmodifiableMap(savedRequestCostMetrics), - Collections.unmodifiableMap(savedUtilizationMetrics) + Collections.unmodifiableMap(savedUtilizationMetrics), + Collections.unmodifiableMap(savedNamedMetrics) ); } diff --git a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java index 3b0cbbbda..a7ff1e5c3 100644 --- a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java @@ -47,8 +47,9 @@ public final class InternalCallMetricRecorder { public static MetricReport createMetricReport(double cpuUtilization, double applicationUtilization, double memoryUtilization, double qps, double eps, - Map<String, Double> requestCostMetrics, Map<String, Double> utilizationMetrics) { + Map<String, Double> requestCostMetrics, Map<String, Double> utilizationMetrics, + Map<String, Double> namedMetrics) { return new MetricReport(cpuUtilization, applicationUtilization, memoryUtilization, qps, eps, - requestCostMetrics, utilizationMetrics); + requestCostMetrics, utilizationMetrics, namedMetrics); } } diff --git a/services/src/main/java/io/grpc/services/MetricRecorder.java b/services/src/main/java/io/grpc/services/MetricRecorder.java index c585e7b54..39b0e8df2 100644 --- a/services/src/main/java/io/grpc/services/MetricRecorder.java +++ b/services/src/main/java/io/grpc/services/MetricRecorder.java @@ -155,6 +155,6 @@ public final class MetricRecorder { MetricReport getMetricReport() { return new MetricReport(cpuUtilization, applicationUtilization, memoryUtilization, qps, eps, - Collections.emptyMap(), Collections.unmodifiableMap(metricsData)); + Collections.emptyMap(), Collections.unmodifiableMap(metricsData), Collections.emptyMap()); } } diff --git a/services/src/main/java/io/grpc/services/MetricReport.java b/services/src/main/java/io/grpc/services/MetricReport.java index 35cbfc056..e559f0f00 100644 --- a/services/src/main/java/io/grpc/services/MetricReport.java +++ b/services/src/main/java/io/grpc/services/MetricReport.java @@ -35,10 +35,11 @@ public final class MetricReport { private double eps; private Map<String, Double> requestCostMetrics; private Map<String, Double> utilizationMetrics; + private Map<String, Double> namedMetrics; MetricReport(double cpuUtilization, double applicationUtilization, double memoryUtilization, double qps, double eps, Map<String, Double> requestCostMetrics, - Map<String, Double> utilizationMetrics) { + Map<String, Double> utilizationMetrics, Map<String, Double> namedMetrics) { this.cpuUtilization = cpuUtilization; this.applicationUtilization = applicationUtilization; this.memoryUtilization = memoryUtilization; @@ -46,6 +47,7 @@ public final class MetricReport { this.eps = eps; this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics"); this.utilizationMetrics = checkNotNull(utilizationMetrics, "utilizationMetrics"); + this.namedMetrics = checkNotNull(namedMetrics, "namedMetrics"); } public double getCpuUtilization() { @@ -68,6 +70,10 @@ public final class MetricReport { return utilizationMetrics; } + public Map<String, Double> getNamedMetrics() { + return namedMetrics; + } + public double getQps() { return qps; } @@ -84,6 +90,7 @@ public final class MetricReport { .add("memoryUtilization", memoryUtilization) .add("requestCost", requestCostMetrics) .add("utilization", utilizationMetrics) + .add("named", namedMetrics) .add("qps", qps) .add("eps", eps) .toString(); diff --git a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java index cb0bfc5d8..f60446b1a 100644 --- a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java +++ b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java @@ -44,6 +44,9 @@ public class CallMetricRecorderTest { recorder.recordRequestCostMetric("cost1", 37465.12); recorder.recordRequestCostMetric("cost2", 10293.0); recorder.recordRequestCostMetric("cost3", 1.0); + recorder.recordNamedMetric("named1", 0.2233); + recorder.recordNamedMetric("named2", -1.618); + recorder.recordNamedMetric("named3", 3.1415926535); recorder.recordCpuUtilizationMetric(0.1928); recorder.recordApplicationUtilizationMetric(0.9987); recorder.recordMemoryUtilizationMetric(0.474); @@ -55,6 +58,8 @@ public class CallMetricRecorderTest { .containsExactly("util1", 0.154353423, "util2", 0.1367, "util3", 0.143734); Truth.assertThat(dump.getRequestCostMetrics()) .containsExactly("cost1", 37465.12, "cost2", 10293.0, "cost3", 1.0); + Truth.assertThat(dump.getNamedMetrics()) + .containsExactly("named1", 0.2233, "named2", -1.618, "named3", 3.1415926535); Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0.1928); Truth.assertThat(dump.getApplicationUtilization()).isEqualTo(0.9987); Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.474); @@ -62,6 +67,9 @@ public class CallMetricRecorderTest { Truth.assertThat(dump.getEps()).isEqualTo(1.618); Truth.assertThat(dump.toString()).contains("eps=1.618"); Truth.assertThat(dump.toString()).contains("applicationUtilization=0.9987"); + Truth.assertThat(dump.toString()).contains("named1=0.2233"); + Truth.assertThat(dump.toString()).contains("named2=-1.618"); + Truth.assertThat(dump.toString()).contains("named3=3.1415926535"); } @Test @@ -71,6 +79,7 @@ public class CallMetricRecorderTest { recorder.recordUtilizationMetric("cost", 0.154353423); recorder.recordQpsMetric(3.14159); recorder.recordEpsMetric(1.618); + recorder.recordNamedMetric("named1", 2.718); assertThat(recorder.finalizeAndDump()).isEqualTo(initDump); } @@ -121,6 +130,9 @@ public class CallMetricRecorderTest { recorder.recordUtilizationMetric("util1", 0.2837421); recorder.recordMemoryUtilizationMetric(0.93840); recorder.recordUtilizationMetric("util1", 0.843233); + recorder.recordNamedMetric("named1", 0.2233); + recorder.recordNamedMetric("named2", 2.718); + recorder.recordNamedMetric("named1", 3.1415926535); recorder.recordQpsMetric(1928.3); recorder.recordQpsMetric(100.8); recorder.recordEpsMetric(3.14159); @@ -133,6 +145,8 @@ public class CallMetricRecorderTest { Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.93840); Truth.assertThat(dump.getUtilizationMetrics()) .containsExactly("util1", 0.843233); + Truth.assertThat(dump.getNamedMetrics()) + .containsExactly("named1", 3.1415926535, "named2", 2.718); Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0); Truth.assertThat(dump.getQps()).isEqualTo(100.8); Truth.assertThat(dump.getEps()).isEqualTo(1.618); diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 074bb301b..b2be811d5 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -33,6 +33,7 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.internal.ForwardingClientStreamTracer; import io.grpc.internal.ObjectPool; +import io.grpc.services.MetricReport; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.ForwardingSubchannel; import io.grpc.util.GracefulSwitchLoadBalancer; @@ -47,6 +48,8 @@ import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import io.grpc.xds.internal.security.SslContextProviderSupplier; +import io.grpc.xds.orca.OrcaPerRequestUtil; +import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -329,7 +332,9 @@ final class ClusterImplLoadBalancer extends LoadBalancer { if (stats != null) { ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory( stats, inFlights, result.getStreamTracerFactory()); - return PickResult.withSubchannel(result.getSubchannel(), tracerFactory); + ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance() + .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats)); + return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory); } } return result; @@ -386,4 +391,22 @@ final class ClusterImplLoadBalancer extends LoadBalancer { }; } } + + private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener { + + private final ClusterLocalityStats stats; + + private OrcaPerRpcListener(ClusterLocalityStats stats) { + this.stats = checkNotNull(stats, "stats"); + } + + /** + * Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is + * included in the snapshot for the LRS report sent to the LRS server. + */ + @Override + public void onLoadReport(MetricReport report) { + stats.recordBackendLoadMetricStats(report.getNamedMetrics()); + } + } } diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index 9daa440a3..b86c8110f 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -45,6 +45,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -361,7 +362,16 @@ final class LoadReportClient { .setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests()) .setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests()) .setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress()) - .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests())); + .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests()) + .addAllLoadMetricStats( + upstreamLocalityStats.loadMetricStatsMap().entrySet().stream().map( + e -> io.envoyproxy.envoy.config.endpoint.v3.EndpointLoadMetricStats.newBuilder() + .setMetricName(e.getKey()) + .setNumRequestsFinishedWithMetric( + e.getValue().numRequestsFinishedWithMetric()) + .setTotalMetricValue(e.getValue().totalMetricValue()) + .build()) + .collect(Collectors.toList()))); } for (DroppedRequests droppedRequests : stats.droppedRequestsList()) { builder.addDroppedRequests( diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java b/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java index 4bd0ba437..e51c6ecee 100644 --- a/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java @@ -23,6 +23,7 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.collect.Sets; import io.grpc.Status; +import io.grpc.xds.Stats.BackendLoadMetricStats; import io.grpc.xds.Stats.ClusterStats; import io.grpc.xds.Stats.DroppedRequests; import io.grpc.xds.Stats.UpstreamLocalityStats; @@ -197,7 +198,7 @@ final class LoadStatsManager2 { } UpstreamLocalityStats upstreamLocalityStats = UpstreamLocalityStats.create( locality, snapshot.callsIssued, snapshot.callsSucceeded, snapshot.callsFailed, - snapshot.callsInProgress); + snapshot.callsInProgress, snapshot.loadMetricStatsMap); builder.addUpstreamLocalityStats(upstreamLocalityStats); // Use the max (drops/loads) recording interval as the overall interval for the // cluster's stats. In general, they should be mostly identical. @@ -322,6 +323,7 @@ final class LoadStatsManager2 { private final AtomicLong callsSucceeded = new AtomicLong(); private final AtomicLong callsFailed = new AtomicLong(); private final AtomicLong callsIssued = new AtomicLong(); + private Map<String, BackendLoadMetricStats> loadMetricStatsMap = new HashMap<>(); private ClusterLocalityStats( String clusterName, @Nullable String edsServiceName, Locality locality, @@ -354,6 +356,23 @@ final class LoadStatsManager2 { } /** + * Records all custom named backend load metric stats for per-call load reporting. For each + * metric key {@code name}, creates a new {@link BackendLoadMetricStats} with a finished + * requests counter of 1 and the {@code value} if the key is not present in the map. Otherwise, + * increments the finished requests counter and adds the {@code value} to the existing + * {@link BackendLoadMetricStats}. + */ + synchronized void recordBackendLoadMetricStats(Map<String, Double> namedMetrics) { + namedMetrics.forEach((name, value) -> { + if (!loadMetricStatsMap.containsKey(name)) { + loadMetricStatsMap.put(name, new BackendLoadMetricStats(1, value)); + } else { + loadMetricStatsMap.get(name).addMetricValueAndIncrementRequestsFinished(value); + } + }); + } + + /** * Release the <i>hard</i> reference for this stats object (previously obtained via {@link * LoadStatsManager2#getClusterLocalityStats}). The object may still be * recording loads after this method, but there is no guarantee loads recorded after this @@ -367,8 +386,13 @@ final class LoadStatsManager2 { private ClusterLocalityStatsSnapshot snapshot() { long duration = stopwatch.elapsed(TimeUnit.NANOSECONDS); stopwatch.reset().start(); + Map<String, BackendLoadMetricStats> loadMetricStatsMapCopy; + synchronized (this) { + loadMetricStatsMapCopy = Collections.unmodifiableMap(loadMetricStatsMap); + loadMetricStatsMap = new HashMap<>(); + } return new ClusterLocalityStatsSnapshot(callsSucceeded.getAndSet(0), callsInProgress.get(), - callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration); + callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration, loadMetricStatsMapCopy); } } @@ -378,15 +402,18 @@ final class LoadStatsManager2 { private final long callsFailed; private final long callsIssued; private final long durationNano; + private final Map<String, BackendLoadMetricStats> loadMetricStatsMap; private ClusterLocalityStatsSnapshot( long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued, - long durationNano) { + long durationNano, Map<String, BackendLoadMetricStats> loadMetricStatsMap) { this.callsSucceeded = callsSucceeded; this.callsInProgress = callsInProgress; this.callsFailed = callsFailed; this.callsIssued = callsIssued; this.durationNano = durationNano; + this.loadMetricStatsMap = Collections.unmodifiableMap( + checkNotNull(loadMetricStatsMap, "loadMetricStatsMap")); } } } diff --git a/xds/src/main/java/io/grpc/xds/Stats.java b/xds/src/main/java/io/grpc/xds/Stats.java index 7e5fa8639..5953f0884 100644 --- a/xds/src/main/java/io/grpc/xds/Stats.java +++ b/xds/src/main/java/io/grpc/xds/Stats.java @@ -18,6 +18,8 @@ package io.grpc.xds; import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.Map; import javax.annotation.Nullable; /** Represents client load stats. */ @@ -101,10 +103,45 @@ final class Stats { abstract long totalRequestsInProgress(); + abstract ImmutableMap<String, BackendLoadMetricStats> loadMetricStatsMap(); + static UpstreamLocalityStats create(Locality locality, long totalIssuedRequests, - long totalSuccessfulRequests, long totalErrorRequests, long totalRequestsInProgress) { + long totalSuccessfulRequests, long totalErrorRequests, long totalRequestsInProgress, + Map<String, BackendLoadMetricStats> loadMetricStatsMap) { return new AutoValue_Stats_UpstreamLocalityStats(locality, totalIssuedRequests, - totalSuccessfulRequests, totalErrorRequests, totalRequestsInProgress); + totalSuccessfulRequests, totalErrorRequests, totalRequestsInProgress, + ImmutableMap.copyOf(loadMetricStatsMap)); + } + } + + /** + * Load metric stats for multi-dimensional load balancing. + */ + static final class BackendLoadMetricStats { + + private long numRequestsFinishedWithMetric; + private double totalMetricValue; + + BackendLoadMetricStats(long numRequestsFinishedWithMetric, double totalMetricValue) { + this.numRequestsFinishedWithMetric = numRequestsFinishedWithMetric; + this.totalMetricValue = totalMetricValue; + } + + public long numRequestsFinishedWithMetric() { + return numRequestsFinishedWithMetric; + } + + public double totalMetricValue() { + return totalMetricValue; + } + + /** + * Adds the given {@code metricValue} and increments the number of requests finished counter for + * the existing {@link BackendLoadMetricStats}. + */ + public void addMetricValueAndIncrementRequestsFinished(double metricValue) { + numRequestsFinishedWithMetric += 1; + totalMetricValue += metricValue; } } } diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java b/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java index 34922dfc8..1b767e030 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java @@ -120,7 +120,8 @@ public final class OrcaMetricReportingServerInterceptor implements ServerInterce .setRpsFractional(internalReport.getQps()) .setEps(internalReport.getEps()) .putAllUtilization(internalReport.getUtilizationMetrics()) - .putAllRequestCost(internalReport.getRequestCostMetrics()); + .putAllRequestCost(internalReport.getRequestCostMetrics()) + .putAllNamedMetrics(internalReport.getNamedMetrics()); } /** @@ -133,7 +134,8 @@ public final class OrcaMetricReportingServerInterceptor implements ServerInterce MetricReport callMetricRecorderReport ) { metricRecorderReportBuilder.putAllUtilization(callMetricRecorderReport.getUtilizationMetrics()) - .putAllRequestCost(callMetricRecorderReport.getRequestCostMetrics()); + .putAllRequestCost(callMetricRecorderReport.getRequestCostMetrics()) + .putAllNamedMetrics(callMetricRecorderReport.getNamedMetrics()); // Overwrite only if the values from the given MetricReport for CallMetricRecorder are set double cpu = callMetricRecorderReport.getCpuUtilization(); if (isReportValueSet(cpu)) { diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java index 97b98cd4a..814015ba9 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java @@ -256,7 +256,7 @@ public abstract class OrcaPerRequestUtil { return InternalCallMetricRecorder.createMetricReport(loadReport.getCpuUtilization(), loadReport.getApplicationUtilization(), loadReport.getMemUtilization(), loadReport.getRpsFractional(), loadReport.getEps(), loadReport.getRequestCostMap(), - loadReport.getUtilizationMap()); + loadReport.getUtilizationMap(), loadReport.getNamedMetricsMap()); } /** diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 2ab101b73..7842967c0 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.github.xds.data.orca.v3.OrcaLoadReport; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import io.grpc.Attributes; @@ -45,6 +46,7 @@ import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; import io.grpc.internal.ObjectPool; import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.protobuf.ProtoUtils; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig; import io.grpc.xds.Endpoints.DropOverload; @@ -88,11 +90,16 @@ import org.mockito.junit.MockitoRule; public class ClusterImplLoadBalancerTest { @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + private static final double TOLERANCE = 1.0e-10; private static final String AUTHORITY = "api.google.com"; private static final String CLUSTER = "cluster-foo.googleapis.com"; private static final String EDS_SERVICE_NAME = "service.googleapis.com"; private static final ServerInfo LRS_SERVER_INFO = ServerInfo.create("api.google.com", InsecureChannelCredentials.create()); + private static final Metadata.Key<OrcaLoadReport> ORCA_ENDPOINT_LOAD_METRICS_KEY = + Metadata.Key.of( + "endpoint-load-metrics-bin", + ProtoUtils.metadataMarshaller(OrcaLoadReport.getDefaultInstance())); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @Override @@ -255,7 +262,21 @@ public class ClusterImplLoadBalancerTest { ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // second RPC call ClientStreamTracer streamTracer3 = result.getStreamTracerFactory().newClientStreamTracer( ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // third RPC call + // When the trailer contains an ORCA report, the listener callback will be invoked. + Metadata trailersWithOrcaLoadReport1 = new Metadata(); + trailersWithOrcaLoadReport1.put(ORCA_ENDPOINT_LOAD_METRICS_KEY, + OrcaLoadReport.newBuilder().setApplicationUtilization(1.414).setMemUtilization(0.034) + .setRpsFractional(1.414).putNamedMetrics("named1", 3.14159) + .putNamedMetrics("named2", -1.618).build()); + streamTracer1.inboundTrailers(trailersWithOrcaLoadReport1); streamTracer1.streamClosed(Status.OK); + Metadata trailersWithOrcaLoadReport2 = new Metadata(); + trailersWithOrcaLoadReport2.put(ORCA_ENDPOINT_LOAD_METRICS_KEY, + OrcaLoadReport.newBuilder().setApplicationUtilization(0.99).setMemUtilization(0.123) + .setRpsFractional(0.905).putNamedMetrics("named1", 2.718) + .putNamedMetrics("named2", 1.414) + .putNamedMetrics("named3", 0.009).build()); + streamTracer2.inboundTrailers(trailersWithOrcaLoadReport2); streamTracer2.streamClosed(Status.UNAVAILABLE); ClusterStats clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER)); @@ -266,6 +287,24 @@ public class ClusterImplLoadBalancerTest { assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(1L); assertThat(localityStats.totalErrorRequests()).isEqualTo(1L); assertThat(localityStats.totalRequestsInProgress()).isEqualTo(1L); + assertThat(localityStats.loadMetricStatsMap().containsKey("named1")).isTrue(); + assertThat( + localityStats.loadMetricStatsMap().get("named1").numRequestsFinishedWithMetric()).isEqualTo( + 2L); + assertThat(localityStats.loadMetricStatsMap().get("named1").totalMetricValue()).isWithin( + TOLERANCE).of(3.14159 + 2.718); + assertThat(localityStats.loadMetricStatsMap().containsKey("named2")).isTrue(); + assertThat( + localityStats.loadMetricStatsMap().get("named2").numRequestsFinishedWithMetric()).isEqualTo( + 2L); + assertThat(localityStats.loadMetricStatsMap().get("named2").totalMetricValue()).isWithin( + TOLERANCE).of(-1.618 + 1.414); + assertThat(localityStats.loadMetricStatsMap().containsKey("named3")).isTrue(); + assertThat( + localityStats.loadMetricStatsMap().get("named3").numRequestsFinishedWithMetric()).isEqualTo( + 1L); + assertThat(localityStats.loadMetricStatsMap().get("named3").totalMetricValue()).isWithin( + TOLERANCE).of(0.009); streamTracer3.streamClosed(Status.OK); subchannel.shutdown(); // stats recorder released @@ -278,6 +317,7 @@ public class ClusterImplLoadBalancerTest { assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(1L); assertThat(localityStats.totalErrorRequests()).isEqualTo(0L); assertThat(localityStats.totalRequestsInProgress()).isEqualTo(0L); + assertThat(localityStats.loadMetricStatsMap().isEmpty()).isTrue(); clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER)); assertThat(clusterStats.upstreamLocalityStatsList()).isEmpty(); // no longer reported diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java index 3c3a11b3c..910a9fc32 100644 --- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java @@ -35,6 +35,7 @@ import com.google.protobuf.Value; import com.google.protobuf.util.Durations; import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; +import io.envoyproxy.envoy.config.endpoint.v3.EndpointLoadMetricStats; import io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats; import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; @@ -198,11 +199,15 @@ public class LoadReportClientTest { for (int i = 0; i < 31; i++) { localityStats1.recordCallStarted(); } + localityStats1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 3.14159)); + localityStats1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 1.618)); + localityStats1.recordBackendLoadMetricStats(ImmutableMap.of("named1", -2.718)); ClusterLocalityStats localityStats2 = loadStatsManager.getClusterLocalityStats(CLUSTER2, EDS_SERVICE_NAME2, LOCALITY2); for (int i = 0; i < 45; i++) { localityStats2.recordCallStarted(); } + localityStats2.recordBackendLoadMetricStats(ImmutableMap.of("named2", 1.414)); localityStats2.recordCallFinished(Status.OK); } @@ -245,6 +250,12 @@ public class LoadReportClientTest { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(31L); + assertThat(localityStats.getLoadMetricStatsCount()).isEqualTo(1); + EndpointLoadMetricStats loadMetricStats = Iterables.getOnlyElement( + localityStats.getLoadMetricStatsList()); + assertThat(loadMetricStats.getMetricName()).isEqualTo("named1"); + assertThat(loadMetricStats.getNumRequestsFinishedWithMetric()).isEqualTo(3L); + assertThat(loadMetricStats.getTotalMetricValue()).isEqualTo(3.14159 + 1.618 - 2.718); fakeClock.forwardTime(10L, TimeUnit.SECONDS); verify(requestObserver, times(3)).onNext(requestCaptor.capture()); @@ -263,6 +274,7 @@ public class LoadReportClientTest { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(31L); + assertThat(localityStats.getLoadMetricStatsList()).isEmpty(); // Management server updates the interval of sending load reports, while still asking for // loads to cluster1 only. @@ -287,6 +299,7 @@ public class LoadReportClientTest { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(31L); + assertThat(localityStats.getLoadMetricStatsList()).isEmpty(); // Management server asks to report loads for all clusters. responseObserver.onNext(LoadStatsResponse.newBuilder().setSendAllClusters(true) @@ -309,6 +322,7 @@ public class LoadReportClientTest { assertThat(localityStats1.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats1.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats1.getTotalRequestsInProgress()).isEqualTo(31L); + assertThat(localityStats1.getLoadMetricStatsList()).isEmpty(); ClusterStats clusterStats2 = findClusterStats(request.getClusterStatsList(), CLUSTER2); assertThat(Durations.toSeconds(clusterStats2.getLoadReportInterval())) .isEqualTo(10L + 10L + 20L + 20L); @@ -326,6 +340,12 @@ public class LoadReportClientTest { assertThat(localityStats2.getTotalSuccessfulRequests()).isEqualTo(1L); assertThat(localityStats2.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats2.getTotalRequestsInProgress()).isEqualTo(45L - 1L); + assertThat(localityStats2.getLoadMetricStatsCount()).isEqualTo(1); + EndpointLoadMetricStats loadMetricStats2 = Iterables.getOnlyElement( + localityStats2.getLoadMetricStatsList()); + assertThat(loadMetricStats2.getMetricName()).isEqualTo("named2"); + assertThat(loadMetricStats2.getNumRequestsFinishedWithMetric()).isEqualTo(1L); + assertThat(loadMetricStats2.getTotalMetricValue()).isEqualTo(1.414); // Load reports for cluster1 is no longer wanted. responseObserver.onNext(LoadStatsResponse.newBuilder().addClusters(CLUSTER2) @@ -348,6 +368,7 @@ public class LoadReportClientTest { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(44L); + assertThat(localityStats.getLoadMetricStatsList()).isEmpty(); fakeClock.forwardTime(10L, TimeUnit.SECONDS); verify(requestObserver, times(7)).onNext(requestCaptor.capture()); @@ -366,6 +387,7 @@ public class LoadReportClientTest { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(44L); + assertThat(localityStats.getLoadMetricStatsList()).isEmpty(); // Management server asks loads for a cluster that client has no load data. responseObserver.onNext(LoadStatsResponse.newBuilder().addClusters("unknown.googleapis.com") @@ -495,6 +517,12 @@ public class LoadReportClientTest { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(31L); + assertThat(localityStats.getLoadMetricStatsCount()).isEqualTo(1); + EndpointLoadMetricStats loadMetricStats = Iterables.getOnlyElement( + localityStats.getLoadMetricStatsList()); + assertThat(loadMetricStats.getMetricName()).isEqualTo("named1"); + assertThat(loadMetricStats.getNumRequestsFinishedWithMetric()).isEqualTo(3L); + assertThat(loadMetricStats.getTotalMetricValue()).isEqualTo(3.14159 + 1.618 - 2.718); // Wrapping up verify(backoffPolicyProvider, times(2)).get(); diff --git a/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java b/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java index 0cfb7f46a..0389fa74a 100644 --- a/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java +++ b/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java @@ -18,6 +18,7 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import io.grpc.Status; import io.grpc.internal.FakeClock; @@ -39,6 +40,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class LoadStatsManager2Test { + private static final double TOLERANCE = 1.0e-10; private static final String CLUSTER_NAME1 = "cluster-foo.googleapis.com"; private static final String CLUSTER_NAME2 = "cluster-bar.googleapis.com"; private static final String EDS_SERVICE_NAME1 = "backend-service-foo.googleapis.com"; @@ -71,12 +73,17 @@ public class LoadStatsManager2Test { for (int i = 0; i < 19; i++) { loadCounter1.recordCallStarted(); } + loadCounter1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 3.14159)); + loadCounter1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 1.618)); + loadCounter1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 99.0)); + loadCounter1.recordBackendLoadMetricStats(ImmutableMap.of("named1", -97.23, "named2", -2.718)); fakeClock.forwardTime(5L, TimeUnit.SECONDS); dropCounter2.recordDroppedRequest(); loadCounter1.recordCallFinished(Status.OK); for (int i = 0; i < 9; i++) { loadCounter2.recordCallStarted(); } + loadCounter2.recordBackendLoadMetricStats(ImmutableMap.of("named3", 0.0009)); loadCounter2.recordCallFinished(Status.UNAVAILABLE); fakeClock.forwardTime(10L, TimeUnit.SECONDS); loadCounter3.recordCallStarted(); @@ -96,6 +103,18 @@ public class LoadStatsManager2Test { assertThat(loadStats1.totalSuccessfulRequests()).isEqualTo(1L); assertThat(loadStats1.totalErrorRequests()).isEqualTo(0L); assertThat(loadStats1.totalRequestsInProgress()).isEqualTo(19L - 1L); + assertThat(loadStats1.loadMetricStatsMap().containsKey("named1")).isTrue(); + assertThat(loadStats1.loadMetricStatsMap().containsKey("named2")).isTrue(); + assertThat( + loadStats1.loadMetricStatsMap().get("named1").numRequestsFinishedWithMetric()).isEqualTo( + 4L); + assertThat(loadStats1.loadMetricStatsMap().get("named1").totalMetricValue()).isWithin(TOLERANCE) + .of(3.14159 + 1.618 + 99 - 97.23); + assertThat( + loadStats1.loadMetricStatsMap().get("named2").numRequestsFinishedWithMetric()).isEqualTo( + 1L); + assertThat(loadStats1.loadMetricStatsMap().get("named2").totalMetricValue()).isWithin(TOLERANCE) + .of(-2.718); UpstreamLocalityStats loadStats2 = findLocalityStats(stats1.upstreamLocalityStatsList(), LOCALITY2); @@ -103,6 +122,12 @@ public class LoadStatsManager2Test { assertThat(loadStats2.totalSuccessfulRequests()).isEqualTo(0L); assertThat(loadStats2.totalErrorRequests()).isEqualTo(1L); assertThat(loadStats2.totalRequestsInProgress()).isEqualTo(9L - 1L); + assertThat(loadStats2.loadMetricStatsMap().containsKey("named3")).isTrue(); + assertThat( + loadStats2.loadMetricStatsMap().get("named3").numRequestsFinishedWithMetric()).isEqualTo( + 1L); + assertThat(loadStats2.loadMetricStatsMap().get("named3").totalMetricValue()).isWithin(TOLERANCE) + .of(0.0009); ClusterStats stats2 = findClusterStats(allStats, CLUSTER_NAME1, EDS_SERVICE_NAME2); assertThat(stats2.loadReportIntervalNano()).isEqualTo(TimeUnit.SECONDS.toNanos(5L + 10L)); @@ -121,6 +146,7 @@ public class LoadStatsManager2Test { assertThat(loadStats3.totalSuccessfulRequests()).isEqualTo(0L); assertThat(loadStats3.totalErrorRequests()).isEqualTo(0L); assertThat(loadStats3.totalRequestsInProgress()).isEqualTo(1L); + assertThat(loadStats3.loadMetricStatsMap()).isEmpty(); fakeClock.forwardTime(3L, TimeUnit.SECONDS); List<ClusterStats> clusterStatsList = loadStatsManager.getClusterStatsReports(CLUSTER_NAME1); @@ -135,11 +161,13 @@ public class LoadStatsManager2Test { assertThat(loadStats1.totalSuccessfulRequests()).isEqualTo(0L); assertThat(loadStats1.totalErrorRequests()).isEqualTo(0L); assertThat(loadStats1.totalRequestsInProgress()).isEqualTo(18L); // still in-progress + assertThat(loadStats1.loadMetricStatsMap()).isEmpty(); loadStats2 = findLocalityStats(stats1.upstreamLocalityStatsList(), LOCALITY2); assertThat(loadStats2.totalIssuedRequests()).isEqualTo(0L); assertThat(loadStats2.totalSuccessfulRequests()).isEqualTo(0L); assertThat(loadStats2.totalErrorRequests()).isEqualTo(0L); assertThat(loadStats2.totalRequestsInProgress()).isEqualTo(8L); // still in-progress + assertThat(loadStats2.loadMetricStatsMap()).isEmpty(); stats2 = findClusterStats(clusterStatsList, CLUSTER_NAME1, EDS_SERVICE_NAME2); assertThat(stats2.loadReportIntervalNano()).isEqualTo(TimeUnit.SECONDS.toNanos(3L)); @@ -194,9 +222,12 @@ public class LoadStatsManager2Test { ClusterLocalityStats ref2 = loadStatsManager.getClusterLocalityStats( CLUSTER_NAME1, EDS_SERVICE_NAME1, LOCALITY1); ref1.recordCallStarted(); + ref1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 1.618)); + ref1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 3.14159)); ref1.recordCallFinished(Status.OK); ref2.recordCallStarted(); ref2.recordCallStarted(); + ref2.recordBackendLoadMetricStats(ImmutableMap.of("named1", -1.0, "named2", 2.718)); ref2.recordCallFinished(Status.UNAVAILABLE); ClusterStats stats = Iterables.getOnlyElement( @@ -207,6 +238,18 @@ public class LoadStatsManager2Test { assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(1L); assertThat(localityStats.totalErrorRequests()).isEqualTo(1L); assertThat(localityStats.totalRequestsInProgress()).isEqualTo(1L + 2L - 1L - 1L); + assertThat(localityStats.loadMetricStatsMap().containsKey("named1")).isTrue(); + assertThat(localityStats.loadMetricStatsMap().containsKey("named2")).isTrue(); + assertThat( + localityStats.loadMetricStatsMap().get("named1").numRequestsFinishedWithMetric()).isEqualTo( + 3L); + assertThat(localityStats.loadMetricStatsMap().get("named1").totalMetricValue()).isWithin( + TOLERANCE).of(1.618 + 3.14159 - 1); + assertThat( + localityStats.loadMetricStatsMap().get("named2").numRequestsFinishedWithMetric()).isEqualTo( + 1L); + assertThat(localityStats.loadMetricStatsMap().get("named2").totalMetricValue()).isEqualTo( + 2.718); } @Test @@ -215,6 +258,8 @@ public class LoadStatsManager2Test { CLUSTER_NAME1, EDS_SERVICE_NAME1, LOCALITY1); counter.recordCallStarted(); counter.recordCallStarted(); + counter.recordBackendLoadMetricStats(ImmutableMap.of("named1", 2.718)); + counter.recordBackendLoadMetricStats(ImmutableMap.of("named1", 1.414)); ClusterStats stats = Iterables.getOnlyElement( loadStatsManager.getClusterStatsReports(CLUSTER_NAME1)); @@ -224,6 +269,12 @@ public class LoadStatsManager2Test { assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.totalErrorRequests()).isEqualTo(0L); assertThat(localityStats.totalRequestsInProgress()).isEqualTo(2L); + assertThat(localityStats.loadMetricStatsMap().containsKey("named1")).isTrue(); + assertThat( + localityStats.loadMetricStatsMap().get("named1").numRequestsFinishedWithMetric()).isEqualTo( + 2L); + assertThat(localityStats.loadMetricStatsMap().get("named1").totalMetricValue()).isEqualTo( + 2.718 + 1.414); // release the counter, but requests still in-flight counter.release(); @@ -234,6 +285,7 @@ public class LoadStatsManager2Test { assertThat(localityStats.totalErrorRequests()).isEqualTo(0L); assertThat(localityStats.totalRequestsInProgress()) .isEqualTo(2L); // retained by in-flight calls + assertThat(localityStats.loadMetricStatsMap().isEmpty()).isTrue(); counter.recordCallFinished(Status.OK); counter.recordCallFinished(Status.UNAVAILABLE); @@ -243,6 +295,7 @@ public class LoadStatsManager2Test { assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(1L); assertThat(localityStats.totalErrorRequests()).isEqualTo(1L); assertThat(localityStats.totalRequestsInProgress()).isEqualTo(0L); + assertThat(localityStats.loadMetricStatsMap().isEmpty()).isTrue(); assertThat(loadStatsManager.getClusterStatsReports(CLUSTER_NAME1)).isEmpty(); } diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index 58a19af96..879cac871 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -214,10 +214,10 @@ public class WeightedRoundRobinLoadBalancerTest { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) .getSubchannel()).isEqualTo(weightedSubchannel1); @@ -260,10 +260,10 @@ public class WeightedRoundRobinLoadBalancerTest { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.9, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.9, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); PickResult pickResult = weightedPicker.pickSubchannel(mockArgs); assertThat(pickResult.getSubchannel()).isEqualTo(weightedSubchannel1); @@ -340,11 +340,11 @@ public class WeightedRoundRobinLoadBalancerTest { @Test public void pickByWeight_largeWeight() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 999, 0, new HashMap<>(), new HashMap<>()); + 0.1, 0, 0.1, 999, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.9, 0, 0.1, 2, 0, new HashMap<>(), new HashMap<>()); + 0.9, 0, 0.1, 2, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); + 0.86, 0, 0.1, 100, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); double totalWeight = 999 / 0.1 + 2 / 0.9 + 100 / 0.86; pickByWeight(report1, report2, report3, 999 / 0.1 / totalWeight, 2 / 0.9 / totalWeight, @@ -354,11 +354,11 @@ public class WeightedRoundRobinLoadBalancerTest { @Test public void pickByWeight_largeWeight_useApplicationUtilization() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.44, 0.1, 0.1, 999, 0, new HashMap<>(), new HashMap<>()); + 0.44, 0.1, 0.1, 999, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.12, 0.9, 0.1, 2, 0, new HashMap<>(), new HashMap<>()); + 0.12, 0.9, 0.1, 2, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.33, 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); + 0.33, 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); double totalWeight = 999 / 0.1 + 2 / 0.9 + 100 / 0.86; pickByWeight(report1, report2, report3, 999 / 0.1 / totalWeight, 2 / 0.9 / totalWeight, @@ -368,11 +368,11 @@ public class WeightedRoundRobinLoadBalancerTest { @Test public void pickByWeight_largeWeight_withEps_defaultErrorUtilizationPenalty() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 999, 13, new HashMap<>(), new HashMap<>()); + 0.1, 0, 0.1, 999, 13, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.9, 0, 0.1, 2, 1.8, new HashMap<>(), new HashMap<>()); + 0.9, 0, 0.1, 2, 1.8, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0, 0.1, 100, 3, new HashMap<>(), new HashMap<>()); + 0.86, 0, 0.1, 100, 3, new HashMap<>(), new HashMap<>(), new HashMap<>()); double weight1 = 999 / (0.1 + 13 / 999F * weightedConfig.errorUtilizationPenalty); double weight2 = 2 / (0.9 + 1.8 / 2F * weightedConfig.errorUtilizationPenalty); double weight3 = 100 / (0.86 + 3 / 100F * weightedConfig.errorUtilizationPenalty); @@ -385,11 +385,11 @@ public class WeightedRoundRobinLoadBalancerTest { @Test public void pickByWeight_normalWeight() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.12, 0, 0.1, 22, 0, new HashMap<>(), new HashMap<>()); + 0.12, 0, 0.1, 22, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.28, 0, 0.1, 40, 0, new HashMap<>(), new HashMap<>()); + 0.28, 0, 0.1, 40, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); + 0.86, 0, 0.1, 100, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); double totalWeight = 22 / 0.12 + 40 / 0.28 + 100 / 0.86; pickByWeight(report1, report2, report3, 22 / 0.12 / totalWeight, 40 / 0.28 / totalWeight, 100 / 0.86 / totalWeight @@ -399,11 +399,11 @@ public class WeightedRoundRobinLoadBalancerTest { @Test public void pickByWeight_normalWeight_useApplicationUtilization() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.72, 0.12, 0.1, 22, 0, new HashMap<>(), new HashMap<>()); + 0.72, 0.12, 0.1, 22, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.98, 0.28, 0.1, 40, 0, new HashMap<>(), new HashMap<>()); + 0.98, 0.28, 0.1, 40, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.99, 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); + 0.99, 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); double totalWeight = 22 / 0.12 + 40 / 0.28 + 100 / 0.86; pickByWeight(report1, report2, report3, 22 / 0.12 / totalWeight, 40 / 0.28 / totalWeight, 100 / 0.86 / totalWeight @@ -413,11 +413,11 @@ public class WeightedRoundRobinLoadBalancerTest { @Test public void pickByWeight_normalWeight_withEps_defaultErrorUtilizationPenalty() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.12, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + 0.12, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.28, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + 0.28, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + 0.86, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>(), new HashMap<>()); double weight1 = 22 / (0.12 + 19.7 / 22F * weightedConfig.errorUtilizationPenalty); double weight2 = 40 / (0.28 + 0.998 / 40F * weightedConfig.errorUtilizationPenalty); double weight3 = 100 / (0.86 + 3.14159 / 100F * weightedConfig.errorUtilizationPenalty); @@ -433,11 +433,11 @@ public class WeightedRoundRobinLoadBalancerTest { .setErrorUtilizationPenalty(1.75F).build(); MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.12, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + 0.12, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.28, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + 0.28, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + 0.86, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>(), new HashMap<>()); double weight1 = 22 / (0.12 + 19.7 / 22F * weightedConfig.errorUtilizationPenalty); double weight2 = 40 / (0.28 + 0.998 / 40F * weightedConfig.errorUtilizationPenalty); double weight3 = 100 / (0.86 + 3.14159 / 100F * weightedConfig.errorUtilizationPenalty); @@ -453,11 +453,11 @@ public class WeightedRoundRobinLoadBalancerTest { .setErrorUtilizationPenalty(1.75F).build(); MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + 0, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + 0, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + 0, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>(), new HashMap<>()); double avgSubchannelPickRatio = 1.0 / 3; pickByWeight(report1, report2, report3, avgSubchannelPickRatio, avgSubchannelPickRatio, @@ -508,10 +508,10 @@ public class WeightedRoundRobinLoadBalancerTest { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(5, TimeUnit.SECONDS)).isEqualTo(1); Map<Subchannel, Integer> pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -568,10 +568,10 @@ public class WeightedRoundRobinLoadBalancerTest { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) .getSubchannel()).isEqualTo(weightedSubchannel1); @@ -585,10 +585,10 @@ public class WeightedRoundRobinLoadBalancerTest { assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); //timer fires, new weight updated assertThat(fakeClock.forwardTime(500, TimeUnit.MILLISECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) @@ -620,10 +620,10 @@ public class WeightedRoundRobinLoadBalancerTest { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); Map<Subchannel, Integer> pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -685,7 +685,7 @@ public class WeightedRoundRobinLoadBalancerTest { subchannel.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( 0.1, 0, 0.1, qpsByChannel.get(subchannel), 0, - new HashMap<>(), new HashMap<>())); + new HashMap<>(), new HashMap<>(), new HashMap<>())); } assertThat(Math.abs(pickCount.get(weightedSubchannel1) / 1000.0 - 1.0 / 2)) .isAtMost(0.1); @@ -701,7 +701,7 @@ public class WeightedRoundRobinLoadBalancerTest { subchannel.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( 0.1, 0, 0.1, qpsByChannel.get(subchannel), 0, - new HashMap<>(), new HashMap<>())); + new HashMap<>(), new HashMap<>(), new HashMap<>())); fakeClock.forwardTime(50, TimeUnit.MILLISECONDS); } assertThat(pickCount.size()).isEqualTo(2); @@ -739,10 +739,10 @@ public class WeightedRoundRobinLoadBalancerTest { WrrSubchannel weightedSubchannel3 = (WrrSubchannel) weightedPicker.getList().get(2); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); Map<Subchannel, Integer> pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -783,10 +783,10 @@ public class WeightedRoundRobinLoadBalancerTest { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); CyclicBarrier barrier = new CyclicBarrier(2); Map<Subchannel, AtomicInteger> pickCount = new ConcurrentHashMap<>(); pickCount.put(weightedSubchannel1, new AtomicInteger(0)); diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java index 00562be3d..469cd9363 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java @@ -72,6 +72,7 @@ public class OrcaMetricReportingServerInterceptorTest { private final Map<String, Double> applicationUtilizationMetricsMap = new HashMap<>(); private final Map<String, Double> applicationCostMetrics = new HashMap<>(); + private final Map<String, Double> applicationNamedMetrics = new HashMap<>(); private double cpuUtilizationMetrics = 0; private double applicationUtilizationMetrics = 0; private double memoryUtilizationMetrics = 0; @@ -98,6 +99,9 @@ public class OrcaMetricReportingServerInterceptorTest { CallMetricRecorder.getCurrent().recordRequestCostMetric(entry.getKey(), entry.getValue()); } + for (Map.Entry<String, Double> entry : applicationNamedMetrics.entrySet()) { + CallMetricRecorder.getCurrent().recordNamedMetric(entry.getKey(), entry.getValue()); + } CallMetricRecorder.getCurrent().recordCpuUtilizationMetric(cpuUtilizationMetrics); CallMetricRecorder.getCurrent() .recordApplicationUtilizationMetric(applicationUtilizationMetrics); @@ -196,6 +200,9 @@ public class OrcaMetricReportingServerInterceptorTest { applicationUtilizationMetricsMap.put("util1", 0.1082); applicationUtilizationMetricsMap.put("util2", 0.4936); applicationUtilizationMetricsMap.put("util3", 0.5342); + applicationNamedMetrics.put("named1", 0.777); + applicationNamedMetrics.put("named2", 737.747); + applicationNamedMetrics.put("named3", -0.380); cpuUtilizationMetrics = 0.3465; applicationUtilizationMetrics = 0.99887; memoryUtilizationMetrics = 0.764; @@ -209,6 +216,8 @@ public class OrcaMetricReportingServerInterceptorTest { .containsExactly("util1", 0.1082, "util2", 0.4936, "util3", 0.5342); assertThat(report.getRequestCostMap()) .containsExactly("cost1", 1231.4543, "cost2", 0.1367, "cost3", 7614.145); + assertThat(report.getNamedMetricsMap()) + .containsExactly("named1", 0.777, "named2", 737.747, "named3", -0.380); assertThat(report.getCpuUtilization()).isEqualTo(0.3465); assertThat(report.getApplicationUtilization()).isEqualTo(0.99887); assertThat(report.getMemUtilization()).isEqualTo(0.764); @@ -221,6 +230,9 @@ public class OrcaMetricReportingServerInterceptorTest { applicationUtilizationMetricsMap.put("util1", 0.1482); applicationUtilizationMetricsMap.put("util2", 0.4036); applicationUtilizationMetricsMap.put("util3", 0.5742); + applicationNamedMetrics.put("named1", 0.777); + applicationNamedMetrics.put("named2", 737.747); + applicationNamedMetrics.put("named3", -0.380); cpuUtilizationMetrics = 0.3465; memoryUtilizationMetrics = 0.967; metricRecorder.setApplicationUtilizationMetric(2.718); @@ -240,6 +252,8 @@ public class OrcaMetricReportingServerInterceptorTest { assertThat(report.getUtilizationMap()) .containsExactly("util1", 0.1482, "util2", 0.4036, "util3", 0.5742, "serverUtil1", 0.7467, "serverUtil2", 0.2233); + assertThat(report.getNamedMetricsMap()) + .containsExactly("named1", 0.777, "named2", 737.747, "named3", -0.380); assertThat(report.getRequestCostMap()).isEmpty(); assertThat(report.getCpuUtilization()).isEqualTo(0.3465); assertThat(report.getApplicationUtilization()).isEqualTo(2.718); diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java index ef06e6fcc..4d0805029 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java @@ -124,7 +124,8 @@ public class OrcaPerRequestUtilTest { && a.getQps() == b.getQps() && a.getEps() == b.getEps() && Objects.equal(a.getRequestCostMetrics(), b.getRequestCostMetrics()) - && Objects.equal(a.getUtilizationMetrics(), b.getUtilizationMetrics()); + && Objects.equal(a.getUtilizationMetrics(), b.getUtilizationMetrics()) + && Objects.equal(a.getNamedMetrics(), b.getNamedMetrics()); } /** |