diff options
8 files changed, 324 insertions, 128 deletions
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java index 5a01e2ade..41bd6b8e1 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java @@ -71,7 +71,8 @@ public final class InProcessChannelBuilder extends this.name = Preconditions.checkNotNull(name, "name"); // In-process transport should not record its traffic to the stats module. // https://github.com/grpc/grpc-java/issues/2284 - setRecordStats(false); + setStatsRecordStartedRpcs(false); + setStatsRecordFinishedRpcs(false); } @Override diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java index aec1df43c..85f7813a8 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java @@ -81,7 +81,8 @@ public final class InProcessServerBuilder this.name = Preconditions.checkNotNull(name, "name"); // In-process transport should not record its traffic to the stats module. // https://github.com/grpc/grpc-java/issues/2284 - setRecordStats(false); + setStatsRecordStartedRpcs(false); + setStatsRecordFinishedRpcs(false); } @Override diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index ebbe2ca3a..50b217c56 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -144,7 +144,8 @@ public abstract class AbstractManagedChannelImplBuilder } private boolean statsEnabled = true; - private boolean recordStats = true; + private boolean recordStartedRpcs = true; + private boolean recordFinishedRpcs = true; private boolean tracingEnabled = true; @Nullable @@ -296,11 +297,19 @@ public abstract class AbstractManagedChannelImplBuilder } /** - * Disable or enable stats recording. Effective only if {@link #setStatsEnabled} is set to true. - * Enabled by default. + * Disable or enable stats recording for RPC upstarts. Effective only if {@link + * #setStatsEnabled} is set to true. Enabled by default. */ - protected void setRecordStats(boolean value) { - recordStats = value; + protected void setStatsRecordStartedRpcs(boolean value) { + recordStartedRpcs = value; + } + + /** + * Disable or enable stats recording for RPC completions. Effective only if {@link + * #setStatsEnabled} is set to true. Enabled by default. + */ + protected void setStatsRecordFinishedRpcs(boolean value) { + recordFinishedRpcs = value; } /** @@ -348,7 +357,8 @@ public abstract class AbstractManagedChannelImplBuilder } // First interceptor runs last (see ClientInterceptors.intercept()), so that no // other interceptor can override the tracer factory we set in CallOptions. - effectiveInterceptors.add(0, censusStats.getClientInterceptor(recordStats)); + effectiveInterceptors.add( + 0, censusStats.getClientInterceptor(recordStartedRpcs, recordFinishedRpcs)); } if (tracingEnabled) { CensusTracingModule censusTracing = diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 0280d51d5..59e8a4db4 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -103,7 +103,8 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil private CensusStatsModule censusStatsOverride; private boolean statsEnabled = true; - private boolean recordStats = true; + private boolean recordStartedRpcs = true; + private boolean recordFinishedRpcs = true; private boolean tracingEnabled = true; @Override @@ -207,11 +208,19 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil } /** - * Disable or enable stats recording. Effective only if {@link #setStatsEnabled} is set to true. - * Enabled by default. + * Disable or enable stats recording for RPC upstarts. Effective only if {@link + * #setStatsEnabled} is set to true. Enabled by default. */ - protected void setRecordStats(boolean value) { - recordStats = value; + protected void setStatsRecordStartedRpcs(boolean value) { + recordStartedRpcs = value; + } + + /** + * Disable or enable stats recording for RPC completions. Effective only if {@link + * #setStatsEnabled} is set to true. Enabled by default. + */ + protected void setStatsRecordFinishedRpcs(boolean value) { + recordFinishedRpcs = value; } /** @@ -242,7 +251,8 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil if (censusStats == null) { censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true); } - tracerFactories.add(censusStats.getServerTracerFactory(recordStats)); + tracerFactories.add( + censusStats.getServerTracerFactory(recordStartedRpcs, recordFinishedRpcs)); } if (tracingEnabled) { CensusTracingModule censusTracing = diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index 524f3a399..37c658af6 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -16,7 +16,6 @@ package io.grpc.internal; -import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY; @@ -53,7 +52,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; /** * Provides factories for {@link StreamTracer} that records stats to Census. @@ -133,22 +131,25 @@ public final class CensusStatsModule { */ @VisibleForTesting ClientCallTracer newClientCallTracer( - TagContext parentCtx, String fullMethodName, boolean recordStats) { - return new ClientCallTracer(this, parentCtx, fullMethodName, recordStats); + TagContext parentCtx, String fullMethodName, + boolean recordStartedRpcs, boolean recordFinishedRpcs) { + return new ClientCallTracer( + this, parentCtx, fullMethodName, recordStartedRpcs, recordFinishedRpcs); } /** * Returns the server tracer factory. */ - ServerStreamTracer.Factory getServerTracerFactory(boolean recordStats) { - return new ServerTracerFactory(recordStats); + ServerStreamTracer.Factory getServerTracerFactory( + boolean recordStartedRpcs, boolean recordFinishedRpcs) { + return new ServerTracerFactory(recordStartedRpcs, recordFinishedRpcs); } /** * Returns the client interceptor that facilitates Census-based stats reporting. */ - ClientInterceptor getClientInterceptor(boolean recordStats) { - return new StatsClientInterceptor(recordStats); + ClientInterceptor getClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) { + return new StatsClientInterceptor(recordStartedRpcs, recordFinishedRpcs); } private static final class ClientTracer extends ClientStreamTracer { @@ -221,18 +222,27 @@ public final class CensusStatsModule { private volatile ClientTracer streamTracer; private volatile int callEnded; private final TagContext parentCtx; - private final boolean recordStats; + private final TagContext startCtx; + private final boolean recordFinishedRpcs; ClientCallTracer( CensusStatsModule module, TagContext parentCtx, String fullMethodName, - boolean recordStats) { + boolean recordStartedRpcs, + boolean recordFinishedRpcs) { this.module = module; - this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); + this.parentCtx = checkNotNull(parentCtx); + this.startCtx = + module.tagger.toBuilder(parentCtx) + .put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName)).build(); this.stopwatch = module.stopwatchSupplier.get().start(); - this.recordStats = recordStats; + this.recordFinishedRpcs = recordFinishedRpcs; + if (recordStartedRpcs) { + module.statsRecorder.newMeasureMap().put(RpcMeasureConstants.RPC_CLIENT_STARTED_COUNT, 1) + .record(startCtx); + } } @Override @@ -262,7 +272,7 @@ public final class CensusStatsModule { if (callEndedUpdater.getAndSet(this, 1) != 0) { return; } - if (!recordStats) { + if (!recordFinishedRpcs) { return; } stopwatch.stop(); @@ -272,7 +282,8 @@ public final class CensusStatsModule { tracer = BLANK_CLIENT_TRACER; } MeasureMap measureMap = module.statsRecorder.newMeasureMap() - // The metrics are in double + .put(RpcMeasureConstants.RPC_CLIENT_FINISHED_COUNT, 1) + // The latency is double value .put(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI) .put(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount) .put(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount) @@ -290,8 +301,7 @@ public final class CensusStatsModule { measureMap.record( module .tagger - .toBuilder(parentCtx) - .put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName)) + .toBuilder(startCtx) .put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) .build()); } @@ -315,12 +325,11 @@ public final class CensusStatsModule { private final CensusStatsModule module; private final String fullMethodName; - @Nullable private final TagContext parentCtx; private volatile int streamClosed; private final Stopwatch stopwatch; private final Tagger tagger; - private final boolean recordStats; + private final boolean recordFinishedRpcs; private volatile long outboundMessageCount; private volatile long inboundMessageCount; private volatile long outboundWireSize; @@ -334,13 +343,18 @@ public final class CensusStatsModule { TagContext parentCtx, Supplier<Stopwatch> stopwatchSupplier, Tagger tagger, - boolean recordStats) { + boolean recordStartedRpcs, + boolean recordFinishedRpcs) { this.module = module; this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.stopwatch = stopwatchSupplier.get().start(); this.tagger = tagger; - this.recordStats = recordStats; + this.recordFinishedRpcs = recordFinishedRpcs; + if (recordStartedRpcs) { + module.statsRecorder.newMeasureMap().put(RpcMeasureConstants.RPC_SERVER_STARTED_COUNT, 1) + .record(parentCtx); + } } @Override @@ -384,13 +398,14 @@ public final class CensusStatsModule { if (streamClosedUpdater.getAndSet(this, 1) != 0) { return; } - if (!recordStats) { + if (!recordFinishedRpcs) { return; } stopwatch.stop(); long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); MeasureMap measureMap = module.statsRecorder.newMeasureMap() - // The metrics are in double + .put(RpcMeasureConstants.RPC_SERVER_FINISHED_COUNT, 1) + // The latency is double value .put(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI) .put(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount) .put(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount) @@ -401,11 +416,10 @@ public final class CensusStatsModule { if (!status.isOk()) { measureMap.put(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT, 1); } - TagContext ctx = firstNonNull(parentCtx, tagger.empty()); measureMap.record( module .tagger - .toBuilder(ctx) + .toBuilder(parentCtx) .put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) .build()); } @@ -421,10 +435,12 @@ public final class CensusStatsModule { @VisibleForTesting final class ServerTracerFactory extends ServerStreamTracer.Factory { - private final boolean recordStats; + private final boolean recordStartedRpcs; + private final boolean recordFinishedRpcs; - ServerTracerFactory(boolean recordStats) { - this.recordStats = recordStats; + ServerTracerFactory(boolean recordStartedRpcs, boolean recordFinishedRpcs) { + this.recordStartedRpcs = recordStartedRpcs; + this.recordFinishedRpcs = recordFinishedRpcs; } @Override @@ -444,16 +460,19 @@ public final class CensusStatsModule { parentCtx, stopwatchSupplier, tagger, - recordStats); + recordStartedRpcs, + recordFinishedRpcs); } } @VisibleForTesting final class StatsClientInterceptor implements ClientInterceptor { - private final boolean recordStats; + private final boolean recordStartedRpcs; + private final boolean recordFinishedRpcs; - StatsClientInterceptor(boolean recordStats) { - this.recordStats = recordStats; + StatsClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) { + this.recordStartedRpcs = recordStartedRpcs; + this.recordFinishedRpcs = recordFinishedRpcs; } @Override @@ -462,7 +481,8 @@ public final class CensusStatsModule { // New RPCs on client-side inherit the tag context from the current Context. TagContext parentCtx = tagger.getCurrentTagContext(); final ClientCallTracer tracerFactory = - newClientCallTracer(parentCtx, method.getFullMethodName(), recordStats); + newClientCallTracer(parentCtx, method.getFullMethodName(), + recordStartedRpcs, recordFinishedRpcs); ClientCall<ReqT, RespT> call = next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory)); return new SimpleForwardingClientCall<ReqT, RespT>(call) { diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java index d77b268ce..89b2da42d 100644 --- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java +++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java @@ -240,7 +240,7 @@ public class CensusModulesTest { Channel interceptedChannel = ClientInterceptors.intercept( grpcServerRule.getChannel(), callOptionsCaptureInterceptor, - censusStats.getClientInterceptor(true), censusTracing.getClientInterceptor()); + censusStats.getClientInterceptor(true, true), censusTracing.getClientInterceptor()); ClientCall<String, String> call; if (nonDefaultContext) { Context ctx = @@ -275,7 +275,20 @@ public class CensusModulesTest { // Make the call Metadata headers = new Metadata(); call.start(mockClientCallListener, headers); - assertNull(statsRecorder.pollRecord()); + + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); + assertNotNull(record); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + if (nonDefaultContext) { + TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG); + assertEquals("extra value", extraTag.asString()); + assertEquals(2, record.tags.size()); + } else { + assertNull(record.tags.get(StatsTestUtils.EXTRA_TAG)); + assertEquals(1, record.tags.size()); + } + if (nonDefaultContext) { verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), same(fakeClientParentSpan)); @@ -297,9 +310,9 @@ public class CensusModulesTest { assertEquals("No you don't", status.getDescription()); // The intercepting listener calls callEnded() on ClientCallTracer, which records to Census. - StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); + record = statsRecorder.pollRecord(); assertNotNull(record); - TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); assertEquals(method.getFullMethodName(), methodTag.asString()); TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.asString()); @@ -320,12 +333,44 @@ public class CensusModulesTest { } @Test - public void clientBasicStatsDefaultContext() { + public void clientBasicStatsDefaultContext_startsAndFinishes() { + subtestClientBasicStatsDefaultContext(true, true); + } + + @Test + public void clientBasicStatsDefaultContext_startsOnly() { + subtestClientBasicStatsDefaultContext(true, false); + } + + @Test + public void clientBasicStatsDefaultContext_finishesOnly() { + subtestClientBasicStatsDefaultContext(false, true); + } + + @Test + public void clientBasicStatsDefaultContext_neither() { + subtestClientBasicStatsDefaultContext(false, true); + } + + private void subtestClientBasicStatsDefaultContext(boolean recordStarts, boolean recordFinishes) { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), true); + censusStats.newClientCallTracer( + tagger.empty(), method.getFullMethodName(), recordStarts, recordFinishes); Metadata headers = new Metadata(); ClientStreamTracer tracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); + if (recordStarts) { + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); + assertNotNull(record); + assertNoServerContent(record); + assertEquals(1, record.tags.size()); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_STARTED_COUNT)); + } else { + assertNull(statsRecorder.pollRecord()); + } + fakeClock.forwardTime(30, MILLISECONDS); tracer.outboundHeaders(); @@ -349,27 +394,32 @@ public class CensusModulesTest { tracer.streamClosed(Status.OK); callTracer.callEnded(Status.OK); - StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); - assertNotNull(record); - assertNoServerContent(record); - TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); - assertEquals(method.getFullMethodName(), methodTag.asString()); - TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); - assertEquals(Status.Code.OK.toString(), statusTag.asString()); - assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); - assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); - assertEquals( - 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); - assertEquals( - 1128 + 865, - record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertEquals( - 33 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); - assertEquals(67 + 552, - record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); - assertEquals(30 + 100 + 16 + 24, - record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + if (recordFinishes) { + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); + assertNotNull(record); + assertNoServerContent(record); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), statusTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_FINISHED_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals( + 1128 + 865, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals( + 33 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertEquals(67 + 552, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals(30 + 100 + 16 + 24, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + } else { + assertNull(statsRecorder.pollRecord()); + } } @Test @@ -433,18 +483,29 @@ public class CensusModulesTest { public void clientStreamNeverCreatedStillRecordStats() { CensusStatsModule.ClientCallTracer callTracer = censusStats.newClientCallTracer( - tagger.empty(), method.getFullMethodName(), true); + tagger.empty(), method.getFullMethodName(), true, true); fakeClock.forwardTime(3000, MILLISECONDS); callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds")); + // Upstart record StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoServerContent(record); + assertEquals(1, record.tags.size()); TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); assertEquals(method.getFullMethodName(), methodTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_STARTED_COUNT)); + + // Completion record + record = statsRecorder.pollRecord(); + assertNotNull(record); + assertNoServerContent(record); + methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_FINISHED_COUNT)); assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); @@ -513,9 +574,21 @@ public class CensusModulesTest { propagate); Metadata headers = new Metadata(); CensusStatsModule.ClientCallTracer callTracer = - census.newClientCallTracer(clientCtx, method.getFullMethodName(), recordStats); + census.newClientCallTracer(clientCtx, method.getFullMethodName(), recordStats, recordStats); // This propagates clientCtx to headers if propagates==true callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); + if (recordStats) { + // Client upstart record + StatsTestUtils.MetricsRecord clientRecord = statsRecorder.pollRecord(); + assertNotNull(clientRecord); + assertNoServerContent(clientRecord); + assertEquals(2, clientRecord.tags.size()); + TagValue clientMethodTag = clientRecord.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), clientMethodTag.asString()); + TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG); + assertEquals("extra-tag-value-897", clientPropagatedTag.asString()); + } + if (propagate) { assertTrue(headers.containsKey(census.statsHeader)); } else { @@ -524,7 +597,7 @@ public class CensusModulesTest { } ServerStreamTracer serverTracer = - census.getServerTracerFactory(recordStats).newServerStreamTracer( + census.getServerTracerFactory(recordStats, recordStats).newServerStreamTracer( method.getFullMethodName(), headers); // Server tracer deserializes clientCtx from the headers, so that it records stats with the // propagated tags. @@ -540,15 +613,26 @@ public class CensusModulesTest { serverTracer.streamClosed(Status.OK); if (recordStats) { + // Server upstart record StatsTestUtils.MetricsRecord serverRecord = statsRecorder.pollRecord(); assertNotNull(serverRecord); assertNoClientContent(serverRecord); + assertEquals(2, serverRecord.tags.size()); TagValue serverMethodTag = serverRecord.tags.get(RpcMeasureConstants.RPC_METHOD); assertEquals(method.getFullMethodName(), serverMethodTag.asString()); + TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG); + assertEquals("extra-tag-value-897", serverPropagatedTag.asString()); + + // Server completion record + serverRecord = statsRecorder.pollRecord(); + assertNotNull(serverRecord); + assertNoClientContent(serverRecord); + serverMethodTag = serverRecord.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), serverMethodTag.asString()); TagValue serverStatusTag = serverRecord.tags.get(RpcMeasureConstants.RPC_STATUS); assertEquals(Status.Code.OK.toString(), serverStatusTag.asString()); assertNull(serverRecord.getMetric(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); - TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG); + serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG); assertEquals("extra-tag-value-897", serverPropagatedTag.asString()); } @@ -557,6 +641,7 @@ public class CensusModulesTest { callTracer.callEnded(Status.OK); if (recordStats) { + // Client completion record StatsTestUtils.MetricsRecord clientRecord = statsRecorder.pollRecord(); assertNotNull(clientRecord); assertNoServerContent(clientRecord); @@ -577,7 +662,7 @@ public class CensusModulesTest { @Test public void statsHeadersNotPropagateDefaultContext() { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), true); + censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), false, false); Metadata headers = new Metadata(); callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); assertFalse(headers.containsKey(censusStats.statsHeader)); @@ -657,11 +742,43 @@ public class CensusModulesTest { } @Test - public void serverBasicStatsNoHeaders() { - ServerStreamTracer.Factory tracerFactory = censusStats.getServerTracerFactory(true); + public void serverBasicStatsNoHeaders_startsAndFinishes() { + subtestServerBasicStatsNoHeaders(true, true); + } + + @Test + public void serverBasicStatsNoHeaders_startsOnly() { + subtestServerBasicStatsNoHeaders(true, false); + } + + @Test + public void serverBasicStatsNoHeaders_finishesOnly() { + subtestServerBasicStatsNoHeaders(false, true); + } + + @Test + public void serverBasicStatsNoHeaders_neither() { + subtestServerBasicStatsNoHeaders(false, false); + } + + private void subtestServerBasicStatsNoHeaders(boolean recordStarts, boolean recordFinishes) { + ServerStreamTracer.Factory tracerFactory = + censusStats.getServerTracerFactory(recordStarts, recordFinishes); ServerStreamTracer tracer = tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); + if (recordStarts) { + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); + assertNotNull(record); + assertNoClientContent(record); + assertEquals(1, record.tags.size()); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_STARTED_COUNT)); + } else { + assertNull(statsRecorder.pollRecord()); + } + Context filteredContext = tracer.filterContext(Context.ROOT); TagContext statsCtx = TAG_CONTEXT_KEY.get(filteredContext); assertEquals( @@ -694,27 +811,32 @@ public class CensusModulesTest { tracer.streamClosed(Status.CANCELLED); - StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); - assertNotNull(record); - assertNoClientContent(record); - TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); - assertEquals(method.getFullMethodName(), methodTag.asString()); - TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); - assertEquals(Status.Code.CANCELLED.toString(), statusTag.asString()); - assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); - assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); - assertEquals( - 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); - assertEquals( - 1128 + 865, - record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); - assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); - assertEquals( - 34 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); - assertEquals(67 + 552, - record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(100 + 16 + 24, - record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); + if (recordFinishes) { + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); + assertNotNull(record); + assertNoClientContent(record); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.CANCELLED.toString(), statusTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_FINISHED_COUNT)); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); + assertEquals( + 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); + assertEquals( + 1128 + 865, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); + assertEquals( + 34 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); + assertEquals(67 + 552, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals(100 + 16 + 24, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); + } else { + assertNull(statsRecorder.pollRecord()); + } } @Test diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index ce734f4f5..6e52ad7e8 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -719,12 +719,15 @@ public abstract class AbstractInteropTest { Status.fromThrowable(responseObserver.getError()).getCode()); if (metricsExpected()) { + MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingInputCall"); // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be // recorded. The tracer stats rely on the stream being created, which is not always the case // in this test. Therefore we don't check the tracer stats. - MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); - checkTags( - clientRecord, "grpc.testing.TestService/StreamingInputCall", Status.CANCELLED.getCode()); + MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkEndTags( + clientEndRecord, "grpc.testing.TestService/StreamingInputCall", + Status.CANCELLED.getCode()); // Do not check server-side metrics, because the status on the server side is undetermined. } } @@ -1044,9 +1047,11 @@ public abstract class AbstractInteropTest { if (metricsExpected()) { // Stream may not have been created before deadline is exceeded, thus we don't test the tracer // stats. - MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); - checkTags( - clientRecord, + MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingOutputCall"); + MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkEndTags( + clientEndRecord, "grpc.testing.TestService/StreamingOutputCall", Status.Code.DEADLINE_EXCEEDED); // Do not check server-side metrics, because the status on the server side is undetermined. @@ -1078,9 +1083,11 @@ public abstract class AbstractInteropTest { if (metricsExpected()) { // Stream may not have been created when deadline is exceeded, thus we don't check tracer // stats. - MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); - checkTags( - clientRecord, + MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingOutputCall"); + MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkEndTags( + clientEndRecord, "grpc.testing.TestService/StreamingOutputCall", Status.Code.DEADLINE_EXCEEDED); // Do not check server-side metrics, because the status on the server side is undetermined. @@ -1103,9 +1110,12 @@ public abstract class AbstractInteropTest { // recorded. The tracer stats rely on the stream being created, which is not the case if // deadline is exceeded before the call is created. Therefore we don't check the tracer stats. if (metricsExpected()) { - MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); - checkTags( - clientRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode()); + MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall"); + MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkEndTags( + clientEndRecord, "grpc.testing.TestService/EmptyCall", + Status.DEADLINE_EXCEEDED.getCode()); } // warm up the channel @@ -1120,9 +1130,12 @@ public abstract class AbstractInteropTest { } assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); if (metricsExpected()) { - MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); - checkTags( - clientRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode()); + MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall"); + MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkEndTags( + clientEndRecord, "grpc.testing.TestService/EmptyCall", + Status.DEADLINE_EXCEEDED.getCode()); } } @@ -1540,9 +1553,11 @@ public abstract class AbstractInteropTest { // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be // recorded. The tracer stats rely on the stream being created, which is not always the case // in this test, thus we will not check that. - MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); - checkTags( - clientRecord, + MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkStartTags(clientStartRecord, "grpc.testing.TestService/FullDuplexCall"); + MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + checkEndTags( + clientEndRecord, "grpc.testing.TestService/FullDuplexCall", Status.DEADLINE_EXCEEDED.getCode()); } @@ -1803,11 +1818,13 @@ public abstract class AbstractInteropTest { if (metricsExpected()) { // CensusStreamTracerModule records final status in interceptor, which is guaranteed to be // done before application receives status. - MetricsRecord clientRecord = clientStatsRecorder.pollRecord(); - checkTags(clientRecord, method, code); + MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(); + checkStartTags(clientStartRecord, method); + MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(); + checkEndTags(clientEndRecord, method, code); if (requests != null && responses != null) { - checkCensus(clientRecord, false, requests, responses); + checkCensus(clientEndRecord, false, requests, responses); } } } @@ -1831,21 +1848,24 @@ public abstract class AbstractInteropTest { // tests. The best we can do here is to exhaust all records and find one that matches the // given conditions. while (true) { - MetricsRecord serverRecord; + MetricsRecord serverStartRecord; + MetricsRecord serverEndRecord; try { // On the server, the stats is finalized in ServerStreamListener.closed(), which can be // run after the client receives the final status. So we use a timeout. - serverRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + serverStartRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS); + serverEndRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } - if (serverRecord == null) { + if (serverEndRecord == null) { break; } try { - checkTags(serverRecord, method, code); + checkStartTags(serverStartRecord, method); + checkEndTags(serverEndRecord, method, code); if (requests != null && responses != null) { - checkCensus(serverRecord, true, requests, responses); + checkCensus(serverEndRecord, true, requests, responses); } passed = true; break; @@ -1899,7 +1919,14 @@ public abstract class AbstractInteropTest { } } - private static void checkTags( + private static void checkStartTags(MetricsRecord record, String methodName) { + assertNotNull("record is not null", record); + TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); + assertNotNull("method name tagged", methodNameTag); + assertEquals("method names match", methodName, methodNameTag.asString()); + } + + private static void checkEndTags( MetricsRecord record, String methodName, Status.Code status) { assertNotNull("record is not null", record); TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); diff --git a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java index 77fc9405d..e4a96c74e 100644 --- a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java +++ b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java @@ -104,6 +104,11 @@ public class StatsTestUtils { } return longValue; } + + @Override + public String toString() { + return "[tags=" + tags + ", metrics=" + metrics + "]"; + } } /** |