aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java3
-rw-r--r--core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java3
-rw-r--r--core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java22
-rw-r--r--core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java22
-rw-r--r--core/src/main/java/io/grpc/internal/CensusStatsModule.java84
-rw-r--r--core/src/test/java/io/grpc/internal/CensusModulesTest.java232
-rw-r--r--interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java81
-rw-r--r--testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java5
8 files changed, 324 insertions, 128 deletions
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
index 5a01e2ade..41bd6b8e1 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
@@ -71,7 +71,8 @@ public final class InProcessChannelBuilder extends
this.name = Preconditions.checkNotNull(name, "name");
// In-process transport should not record its traffic to the stats module.
// https://github.com/grpc/grpc-java/issues/2284
- setRecordStats(false);
+ setStatsRecordStartedRpcs(false);
+ setStatsRecordFinishedRpcs(false);
}
@Override
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
index aec1df43c..85f7813a8 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java
@@ -81,7 +81,8 @@ public final class InProcessServerBuilder
this.name = Preconditions.checkNotNull(name, "name");
// In-process transport should not record its traffic to the stats module.
// https://github.com/grpc/grpc-java/issues/2284
- setRecordStats(false);
+ setStatsRecordStartedRpcs(false);
+ setStatsRecordFinishedRpcs(false);
}
@Override
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index ebbe2ca3a..50b217c56 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -144,7 +144,8 @@ public abstract class AbstractManagedChannelImplBuilder
}
private boolean statsEnabled = true;
- private boolean recordStats = true;
+ private boolean recordStartedRpcs = true;
+ private boolean recordFinishedRpcs = true;
private boolean tracingEnabled = true;
@Nullable
@@ -296,11 +297,19 @@ public abstract class AbstractManagedChannelImplBuilder
}
/**
- * Disable or enable stats recording. Effective only if {@link #setStatsEnabled} is set to true.
- * Enabled by default.
+ * Disable or enable stats recording for RPC upstarts. Effective only if {@link
+ * #setStatsEnabled} is set to true. Enabled by default.
*/
- protected void setRecordStats(boolean value) {
- recordStats = value;
+ protected void setStatsRecordStartedRpcs(boolean value) {
+ recordStartedRpcs = value;
+ }
+
+ /**
+ * Disable or enable stats recording for RPC completions. Effective only if {@link
+ * #setStatsEnabled} is set to true. Enabled by default.
+ */
+ protected void setStatsRecordFinishedRpcs(boolean value) {
+ recordFinishedRpcs = value;
}
/**
@@ -348,7 +357,8 @@ public abstract class AbstractManagedChannelImplBuilder
}
// First interceptor runs last (see ClientInterceptors.intercept()), so that no
// other interceptor can override the tracer factory we set in CallOptions.
- effectiveInterceptors.add(0, censusStats.getClientInterceptor(recordStats));
+ effectiveInterceptors.add(
+ 0, censusStats.getClientInterceptor(recordStartedRpcs, recordFinishedRpcs));
}
if (tracingEnabled) {
CensusTracingModule censusTracing =
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
index 0280d51d5..59e8a4db4 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
@@ -103,7 +103,8 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
private CensusStatsModule censusStatsOverride;
private boolean statsEnabled = true;
- private boolean recordStats = true;
+ private boolean recordStartedRpcs = true;
+ private boolean recordFinishedRpcs = true;
private boolean tracingEnabled = true;
@Override
@@ -207,11 +208,19 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
}
/**
- * Disable or enable stats recording. Effective only if {@link #setStatsEnabled} is set to true.
- * Enabled by default.
+ * Disable or enable stats recording for RPC upstarts. Effective only if {@link
+ * #setStatsEnabled} is set to true. Enabled by default.
*/
- protected void setRecordStats(boolean value) {
- recordStats = value;
+ protected void setStatsRecordStartedRpcs(boolean value) {
+ recordStartedRpcs = value;
+ }
+
+ /**
+ * Disable or enable stats recording for RPC completions. Effective only if {@link
+ * #setStatsEnabled} is set to true. Enabled by default.
+ */
+ protected void setStatsRecordFinishedRpcs(boolean value) {
+ recordFinishedRpcs = value;
}
/**
@@ -242,7 +251,8 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
if (censusStats == null) {
censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true);
}
- tracerFactories.add(censusStats.getServerTracerFactory(recordStats));
+ tracerFactories.add(
+ censusStats.getServerTracerFactory(recordStartedRpcs, recordFinishedRpcs));
}
if (tracingEnabled) {
CensusTracingModule censusTracing =
diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java
index 524f3a399..37c658af6 100644
--- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java
+++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java
@@ -16,7 +16,6 @@
package io.grpc.internal;
-import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY;
@@ -53,7 +52,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.annotation.Nullable;
/**
* Provides factories for {@link StreamTracer} that records stats to Census.
@@ -133,22 +131,25 @@ public final class CensusStatsModule {
*/
@VisibleForTesting
ClientCallTracer newClientCallTracer(
- TagContext parentCtx, String fullMethodName, boolean recordStats) {
- return new ClientCallTracer(this, parentCtx, fullMethodName, recordStats);
+ TagContext parentCtx, String fullMethodName,
+ boolean recordStartedRpcs, boolean recordFinishedRpcs) {
+ return new ClientCallTracer(
+ this, parentCtx, fullMethodName, recordStartedRpcs, recordFinishedRpcs);
}
/**
* Returns the server tracer factory.
*/
- ServerStreamTracer.Factory getServerTracerFactory(boolean recordStats) {
- return new ServerTracerFactory(recordStats);
+ ServerStreamTracer.Factory getServerTracerFactory(
+ boolean recordStartedRpcs, boolean recordFinishedRpcs) {
+ return new ServerTracerFactory(recordStartedRpcs, recordFinishedRpcs);
}
/**
* Returns the client interceptor that facilitates Census-based stats reporting.
*/
- ClientInterceptor getClientInterceptor(boolean recordStats) {
- return new StatsClientInterceptor(recordStats);
+ ClientInterceptor getClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
+ return new StatsClientInterceptor(recordStartedRpcs, recordFinishedRpcs);
}
private static final class ClientTracer extends ClientStreamTracer {
@@ -221,18 +222,27 @@ public final class CensusStatsModule {
private volatile ClientTracer streamTracer;
private volatile int callEnded;
private final TagContext parentCtx;
- private final boolean recordStats;
+ private final TagContext startCtx;
+ private final boolean recordFinishedRpcs;
ClientCallTracer(
CensusStatsModule module,
TagContext parentCtx,
String fullMethodName,
- boolean recordStats) {
+ boolean recordStartedRpcs,
+ boolean recordFinishedRpcs) {
this.module = module;
- this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
+ this.parentCtx = checkNotNull(parentCtx);
+ this.startCtx =
+ module.tagger.toBuilder(parentCtx)
+ .put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName)).build();
this.stopwatch = module.stopwatchSupplier.get().start();
- this.recordStats = recordStats;
+ this.recordFinishedRpcs = recordFinishedRpcs;
+ if (recordStartedRpcs) {
+ module.statsRecorder.newMeasureMap().put(RpcMeasureConstants.RPC_CLIENT_STARTED_COUNT, 1)
+ .record(startCtx);
+ }
}
@Override
@@ -262,7 +272,7 @@ public final class CensusStatsModule {
if (callEndedUpdater.getAndSet(this, 1) != 0) {
return;
}
- if (!recordStats) {
+ if (!recordFinishedRpcs) {
return;
}
stopwatch.stop();
@@ -272,7 +282,8 @@ public final class CensusStatsModule {
tracer = BLANK_CLIENT_TRACER;
}
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
- // The metrics are in double
+ .put(RpcMeasureConstants.RPC_CLIENT_FINISHED_COUNT, 1)
+ // The latency is double value
.put(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI)
.put(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount)
.put(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount)
@@ -290,8 +301,7 @@ public final class CensusStatsModule {
measureMap.record(
module
.tagger
- .toBuilder(parentCtx)
- .put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName))
+ .toBuilder(startCtx)
.put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
.build());
}
@@ -315,12 +325,11 @@ public final class CensusStatsModule {
private final CensusStatsModule module;
private final String fullMethodName;
- @Nullable
private final TagContext parentCtx;
private volatile int streamClosed;
private final Stopwatch stopwatch;
private final Tagger tagger;
- private final boolean recordStats;
+ private final boolean recordFinishedRpcs;
private volatile long outboundMessageCount;
private volatile long inboundMessageCount;
private volatile long outboundWireSize;
@@ -334,13 +343,18 @@ public final class CensusStatsModule {
TagContext parentCtx,
Supplier<Stopwatch> stopwatchSupplier,
Tagger tagger,
- boolean recordStats) {
+ boolean recordStartedRpcs,
+ boolean recordFinishedRpcs) {
this.module = module;
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.parentCtx = checkNotNull(parentCtx, "parentCtx");
this.stopwatch = stopwatchSupplier.get().start();
this.tagger = tagger;
- this.recordStats = recordStats;
+ this.recordFinishedRpcs = recordFinishedRpcs;
+ if (recordStartedRpcs) {
+ module.statsRecorder.newMeasureMap().put(RpcMeasureConstants.RPC_SERVER_STARTED_COUNT, 1)
+ .record(parentCtx);
+ }
}
@Override
@@ -384,13 +398,14 @@ public final class CensusStatsModule {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
}
- if (!recordStats) {
+ if (!recordFinishedRpcs) {
return;
}
stopwatch.stop();
long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
MeasureMap measureMap = module.statsRecorder.newMeasureMap()
- // The metrics are in double
+ .put(RpcMeasureConstants.RPC_SERVER_FINISHED_COUNT, 1)
+ // The latency is double value
.put(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI)
.put(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount)
.put(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount)
@@ -401,11 +416,10 @@ public final class CensusStatsModule {
if (!status.isOk()) {
measureMap.put(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT, 1);
}
- TagContext ctx = firstNonNull(parentCtx, tagger.empty());
measureMap.record(
module
.tagger
- .toBuilder(ctx)
+ .toBuilder(parentCtx)
.put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString()))
.build());
}
@@ -421,10 +435,12 @@ public final class CensusStatsModule {
@VisibleForTesting
final class ServerTracerFactory extends ServerStreamTracer.Factory {
- private final boolean recordStats;
+ private final boolean recordStartedRpcs;
+ private final boolean recordFinishedRpcs;
- ServerTracerFactory(boolean recordStats) {
- this.recordStats = recordStats;
+ ServerTracerFactory(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
+ this.recordStartedRpcs = recordStartedRpcs;
+ this.recordFinishedRpcs = recordFinishedRpcs;
}
@Override
@@ -444,16 +460,19 @@ public final class CensusStatsModule {
parentCtx,
stopwatchSupplier,
tagger,
- recordStats);
+ recordStartedRpcs,
+ recordFinishedRpcs);
}
}
@VisibleForTesting
final class StatsClientInterceptor implements ClientInterceptor {
- private final boolean recordStats;
+ private final boolean recordStartedRpcs;
+ private final boolean recordFinishedRpcs;
- StatsClientInterceptor(boolean recordStats) {
- this.recordStats = recordStats;
+ StatsClientInterceptor(boolean recordStartedRpcs, boolean recordFinishedRpcs) {
+ this.recordStartedRpcs = recordStartedRpcs;
+ this.recordFinishedRpcs = recordFinishedRpcs;
}
@Override
@@ -462,7 +481,8 @@ public final class CensusStatsModule {
// New RPCs on client-side inherit the tag context from the current Context.
TagContext parentCtx = tagger.getCurrentTagContext();
final ClientCallTracer tracerFactory =
- newClientCallTracer(parentCtx, method.getFullMethodName(), recordStats);
+ newClientCallTracer(parentCtx, method.getFullMethodName(),
+ recordStartedRpcs, recordFinishedRpcs);
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java
index d77b268ce..89b2da42d 100644
--- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java
+++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java
@@ -240,7 +240,7 @@ public class CensusModulesTest {
Channel interceptedChannel =
ClientInterceptors.intercept(
grpcServerRule.getChannel(), callOptionsCaptureInterceptor,
- censusStats.getClientInterceptor(true), censusTracing.getClientInterceptor());
+ censusStats.getClientInterceptor(true, true), censusTracing.getClientInterceptor());
ClientCall<String, String> call;
if (nonDefaultContext) {
Context ctx =
@@ -275,7 +275,20 @@ public class CensusModulesTest {
// Make the call
Metadata headers = new Metadata();
call.start(mockClientCallListener, headers);
- assertNull(statsRecorder.pollRecord());
+
+ StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
+ assertNotNull(record);
+ TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ if (nonDefaultContext) {
+ TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG);
+ assertEquals("extra value", extraTag.asString());
+ assertEquals(2, record.tags.size());
+ } else {
+ assertNull(record.tags.get(StatsTestUtils.EXTRA_TAG));
+ assertEquals(1, record.tags.size());
+ }
+
if (nonDefaultContext) {
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
@@ -297,9 +310,9 @@ public class CensusModulesTest {
assertEquals("No you don't", status.getDescription());
// The intercepting listener calls callEnded() on ClientCallTracer, which records to Census.
- StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
+ record = statsRecorder.pollRecord();
assertNotNull(record);
- TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
+ methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.asString());
@@ -320,12 +333,44 @@ public class CensusModulesTest {
}
@Test
- public void clientBasicStatsDefaultContext() {
+ public void clientBasicStatsDefaultContext_startsAndFinishes() {
+ subtestClientBasicStatsDefaultContext(true, true);
+ }
+
+ @Test
+ public void clientBasicStatsDefaultContext_startsOnly() {
+ subtestClientBasicStatsDefaultContext(true, false);
+ }
+
+ @Test
+ public void clientBasicStatsDefaultContext_finishesOnly() {
+ subtestClientBasicStatsDefaultContext(false, true);
+ }
+
+ @Test
+ public void clientBasicStatsDefaultContext_neither() {
+ subtestClientBasicStatsDefaultContext(false, true);
+ }
+
+ private void subtestClientBasicStatsDefaultContext(boolean recordStarts, boolean recordFinishes) {
CensusStatsModule.ClientCallTracer callTracer =
- censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), true);
+ censusStats.newClientCallTracer(
+ tagger.empty(), method.getFullMethodName(), recordStarts, recordFinishes);
Metadata headers = new Metadata();
ClientStreamTracer tracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
+ if (recordStarts) {
+ StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
+ assertNotNull(record);
+ assertNoServerContent(record);
+ assertEquals(1, record.tags.size());
+ TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_STARTED_COUNT));
+ } else {
+ assertNull(statsRecorder.pollRecord());
+ }
+
fakeClock.forwardTime(30, MILLISECONDS);
tracer.outboundHeaders();
@@ -349,27 +394,32 @@ public class CensusModulesTest {
tracer.streamClosed(Status.OK);
callTracer.callEnded(Status.OK);
- StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
- assertNotNull(record);
- assertNoServerContent(record);
- TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
- assertEquals(method.getFullMethodName(), methodTag.asString());
- TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
- assertEquals(Status.Code.OK.toString(), statusTag.asString());
- assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT));
- assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT));
- assertEquals(
- 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES));
- assertEquals(
- 1128 + 865,
- record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
- assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT));
- assertEquals(
- 33 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES));
- assertEquals(67 + 552,
- record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
- assertEquals(30 + 100 + 16 + 24,
- record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+ if (recordFinishes) {
+ StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
+ assertNotNull(record);
+ assertNoServerContent(record);
+ TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
+ assertEquals(Status.Code.OK.toString(), statusTag.asString());
+ assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_FINISHED_COUNT));
+ assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT));
+ assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT));
+ assertEquals(
+ 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES));
+ assertEquals(
+ 1128 + 865,
+ record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
+ assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT));
+ assertEquals(
+ 33 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES));
+ assertEquals(67 + 552,
+ record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
+ assertEquals(30 + 100 + 16 + 24,
+ record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+ } else {
+ assertNull(statsRecorder.pollRecord());
+ }
}
@Test
@@ -433,18 +483,29 @@ public class CensusModulesTest {
public void clientStreamNeverCreatedStillRecordStats() {
CensusStatsModule.ClientCallTracer callTracer =
censusStats.newClientCallTracer(
- tagger.empty(), method.getFullMethodName(), true);
+ tagger.empty(), method.getFullMethodName(), true, true);
fakeClock.forwardTime(3000, MILLISECONDS);
callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
+ // Upstart record
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
assertNotNull(record);
assertNoServerContent(record);
+ assertEquals(1, record.tags.size());
TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), methodTag.asString());
+ assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_STARTED_COUNT));
+
+ // Completion record
+ record = statsRecorder.pollRecord();
+ assertNotNull(record);
+ assertNoServerContent(record);
+ methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.asString());
+ assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_FINISHED_COUNT));
assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT));
assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT));
assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES));
@@ -513,9 +574,21 @@ public class CensusModulesTest {
propagate);
Metadata headers = new Metadata();
CensusStatsModule.ClientCallTracer callTracer =
- census.newClientCallTracer(clientCtx, method.getFullMethodName(), recordStats);
+ census.newClientCallTracer(clientCtx, method.getFullMethodName(), recordStats, recordStats);
// This propagates clientCtx to headers if propagates==true
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
+ if (recordStats) {
+ // Client upstart record
+ StatsTestUtils.MetricsRecord clientRecord = statsRecorder.pollRecord();
+ assertNotNull(clientRecord);
+ assertNoServerContent(clientRecord);
+ assertEquals(2, clientRecord.tags.size());
+ TagValue clientMethodTag = clientRecord.tags.get(RpcMeasureConstants.RPC_METHOD);
+ assertEquals(method.getFullMethodName(), clientMethodTag.asString());
+ TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG);
+ assertEquals("extra-tag-value-897", clientPropagatedTag.asString());
+ }
+
if (propagate) {
assertTrue(headers.containsKey(census.statsHeader));
} else {
@@ -524,7 +597,7 @@ public class CensusModulesTest {
}
ServerStreamTracer serverTracer =
- census.getServerTracerFactory(recordStats).newServerStreamTracer(
+ census.getServerTracerFactory(recordStats, recordStats).newServerStreamTracer(
method.getFullMethodName(), headers);
// Server tracer deserializes clientCtx from the headers, so that it records stats with the
// propagated tags.
@@ -540,15 +613,26 @@ public class CensusModulesTest {
serverTracer.streamClosed(Status.OK);
if (recordStats) {
+ // Server upstart record
StatsTestUtils.MetricsRecord serverRecord = statsRecorder.pollRecord();
assertNotNull(serverRecord);
assertNoClientContent(serverRecord);
+ assertEquals(2, serverRecord.tags.size());
TagValue serverMethodTag = serverRecord.tags.get(RpcMeasureConstants.RPC_METHOD);
assertEquals(method.getFullMethodName(), serverMethodTag.asString());
+ TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG);
+ assertEquals("extra-tag-value-897", serverPropagatedTag.asString());
+
+ // Server completion record
+ serverRecord = statsRecorder.pollRecord();
+ assertNotNull(serverRecord);
+ assertNoClientContent(serverRecord);
+ serverMethodTag = serverRecord.tags.get(RpcMeasureConstants.RPC_METHOD);
+ assertEquals(method.getFullMethodName(), serverMethodTag.asString());
TagValue serverStatusTag = serverRecord.tags.get(RpcMeasureConstants.RPC_STATUS);
assertEquals(Status.Code.OK.toString(), serverStatusTag.asString());
assertNull(serverRecord.getMetric(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT));
- TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG);
+ serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG);
assertEquals("extra-tag-value-897", serverPropagatedTag.asString());
}
@@ -557,6 +641,7 @@ public class CensusModulesTest {
callTracer.callEnded(Status.OK);
if (recordStats) {
+ // Client completion record
StatsTestUtils.MetricsRecord clientRecord = statsRecorder.pollRecord();
assertNotNull(clientRecord);
assertNoServerContent(clientRecord);
@@ -577,7 +662,7 @@ public class CensusModulesTest {
@Test
public void statsHeadersNotPropagateDefaultContext() {
CensusStatsModule.ClientCallTracer callTracer =
- censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), true);
+ censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), false, false);
Metadata headers = new Metadata();
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
assertFalse(headers.containsKey(censusStats.statsHeader));
@@ -657,11 +742,43 @@ public class CensusModulesTest {
}
@Test
- public void serverBasicStatsNoHeaders() {
- ServerStreamTracer.Factory tracerFactory = censusStats.getServerTracerFactory(true);
+ public void serverBasicStatsNoHeaders_startsAndFinishes() {
+ subtestServerBasicStatsNoHeaders(true, true);
+ }
+
+ @Test
+ public void serverBasicStatsNoHeaders_startsOnly() {
+ subtestServerBasicStatsNoHeaders(true, false);
+ }
+
+ @Test
+ public void serverBasicStatsNoHeaders_finishesOnly() {
+ subtestServerBasicStatsNoHeaders(false, true);
+ }
+
+ @Test
+ public void serverBasicStatsNoHeaders_neither() {
+ subtestServerBasicStatsNoHeaders(false, false);
+ }
+
+ private void subtestServerBasicStatsNoHeaders(boolean recordStarts, boolean recordFinishes) {
+ ServerStreamTracer.Factory tracerFactory =
+ censusStats.getServerTracerFactory(recordStarts, recordFinishes);
ServerStreamTracer tracer =
tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());
+ if (recordStarts) {
+ StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
+ assertNotNull(record);
+ assertNoClientContent(record);
+ assertEquals(1, record.tags.size());
+ TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_STARTED_COUNT));
+ } else {
+ assertNull(statsRecorder.pollRecord());
+ }
+
Context filteredContext = tracer.filterContext(Context.ROOT);
TagContext statsCtx = TAG_CONTEXT_KEY.get(filteredContext);
assertEquals(
@@ -694,27 +811,32 @@ public class CensusModulesTest {
tracer.streamClosed(Status.CANCELLED);
- StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
- assertNotNull(record);
- assertNoClientContent(record);
- TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
- assertEquals(method.getFullMethodName(), methodTag.asString());
- TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
- assertEquals(Status.Code.CANCELLED.toString(), statusTag.asString());
- assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT));
- assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT));
- assertEquals(
- 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES));
- assertEquals(
- 1128 + 865,
- record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
- assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT));
- assertEquals(
- 34 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES));
- assertEquals(67 + 552,
- record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
- assertEquals(100 + 16 + 24,
- record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY));
+ if (recordFinishes) {
+ StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
+ assertNotNull(record);
+ assertNoClientContent(record);
+ TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
+ assertEquals(method.getFullMethodName(), methodTag.asString());
+ TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
+ assertEquals(Status.Code.CANCELLED.toString(), statusTag.asString());
+ assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_FINISHED_COUNT));
+ assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT));
+ assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT));
+ assertEquals(
+ 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES));
+ assertEquals(
+ 1128 + 865,
+ record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
+ assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT));
+ assertEquals(
+ 34 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES));
+ assertEquals(67 + 552,
+ record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
+ assertEquals(100 + 16 + 24,
+ record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY));
+ } else {
+ assertNull(statsRecorder.pollRecord());
+ }
}
@Test
diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
index ce734f4f5..6e52ad7e8 100644
--- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
+++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
@@ -719,12 +719,15 @@ public abstract class AbstractInteropTest {
Status.fromThrowable(responseObserver.getError()).getCode());
if (metricsExpected()) {
+ MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingInputCall");
// CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
// recorded. The tracer stats rely on the stream being created, which is not always the case
// in this test. Therefore we don't check the tracer stats.
- MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
- checkTags(
- clientRecord, "grpc.testing.TestService/StreamingInputCall", Status.CANCELLED.getCode());
+ MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkEndTags(
+ clientEndRecord, "grpc.testing.TestService/StreamingInputCall",
+ Status.CANCELLED.getCode());
// Do not check server-side metrics, because the status on the server side is undetermined.
}
}
@@ -1044,9 +1047,11 @@ public abstract class AbstractInteropTest {
if (metricsExpected()) {
// Stream may not have been created before deadline is exceeded, thus we don't test the tracer
// stats.
- MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
- checkTags(
- clientRecord,
+ MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingOutputCall");
+ MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkEndTags(
+ clientEndRecord,
"grpc.testing.TestService/StreamingOutputCall",
Status.Code.DEADLINE_EXCEEDED);
// Do not check server-side metrics, because the status on the server side is undetermined.
@@ -1078,9 +1083,11 @@ public abstract class AbstractInteropTest {
if (metricsExpected()) {
// Stream may not have been created when deadline is exceeded, thus we don't check tracer
// stats.
- MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
- checkTags(
- clientRecord,
+ MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingOutputCall");
+ MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkEndTags(
+ clientEndRecord,
"grpc.testing.TestService/StreamingOutputCall",
Status.Code.DEADLINE_EXCEEDED);
// Do not check server-side metrics, because the status on the server side is undetermined.
@@ -1103,9 +1110,12 @@ public abstract class AbstractInteropTest {
// recorded. The tracer stats rely on the stream being created, which is not the case if
// deadline is exceeded before the call is created. Therefore we don't check the tracer stats.
if (metricsExpected()) {
- MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
- checkTags(
- clientRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode());
+ MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall");
+ MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkEndTags(
+ clientEndRecord, "grpc.testing.TestService/EmptyCall",
+ Status.DEADLINE_EXCEEDED.getCode());
}
// warm up the channel
@@ -1120,9 +1130,12 @@ public abstract class AbstractInteropTest {
}
assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK);
if (metricsExpected()) {
- MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
- checkTags(
- clientRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode());
+ MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall");
+ MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkEndTags(
+ clientEndRecord, "grpc.testing.TestService/EmptyCall",
+ Status.DEADLINE_EXCEEDED.getCode());
}
}
@@ -1540,9 +1553,11 @@ public abstract class AbstractInteropTest {
// CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
// recorded. The tracer stats rely on the stream being created, which is not always the case
// in this test, thus we will not check that.
- MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
- checkTags(
- clientRecord,
+ MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkStartTags(clientStartRecord, "grpc.testing.TestService/FullDuplexCall");
+ MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ checkEndTags(
+ clientEndRecord,
"grpc.testing.TestService/FullDuplexCall",
Status.DEADLINE_EXCEEDED.getCode());
}
@@ -1803,11 +1818,13 @@ public abstract class AbstractInteropTest {
if (metricsExpected()) {
// CensusStreamTracerModule records final status in interceptor, which is guaranteed to be
// done before application receives status.
- MetricsRecord clientRecord = clientStatsRecorder.pollRecord();
- checkTags(clientRecord, method, code);
+ MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord();
+ checkStartTags(clientStartRecord, method);
+ MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord();
+ checkEndTags(clientEndRecord, method, code);
if (requests != null && responses != null) {
- checkCensus(clientRecord, false, requests, responses);
+ checkCensus(clientEndRecord, false, requests, responses);
}
}
}
@@ -1831,21 +1848,24 @@ public abstract class AbstractInteropTest {
// tests. The best we can do here is to exhaust all records and find one that matches the
// given conditions.
while (true) {
- MetricsRecord serverRecord;
+ MetricsRecord serverStartRecord;
+ MetricsRecord serverEndRecord;
try {
// On the server, the stats is finalized in ServerStreamListener.closed(), which can be
// run after the client receives the final status. So we use a timeout.
- serverRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ serverStartRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
+ serverEndRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
- if (serverRecord == null) {
+ if (serverEndRecord == null) {
break;
}
try {
- checkTags(serverRecord, method, code);
+ checkStartTags(serverStartRecord, method);
+ checkEndTags(serverEndRecord, method, code);
if (requests != null && responses != null) {
- checkCensus(serverRecord, true, requests, responses);
+ checkCensus(serverEndRecord, true, requests, responses);
}
passed = true;
break;
@@ -1899,7 +1919,14 @@ public abstract class AbstractInteropTest {
}
}
- private static void checkTags(
+ private static void checkStartTags(MetricsRecord record, String methodName) {
+ assertNotNull("record is not null", record);
+ TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
+ assertNotNull("method name tagged", methodNameTag);
+ assertEquals("method names match", methodName, methodNameTag.asString());
+ }
+
+ private static void checkEndTags(
MetricsRecord record, String methodName, Status.Code status) {
assertNotNull("record is not null", record);
TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
diff --git a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java
index 77fc9405d..e4a96c74e 100644
--- a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java
+++ b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java
@@ -104,6 +104,11 @@ public class StatsTestUtils {
}
return longValue;
}
+
+ @Override
+ public String toString() {
+ return "[tags=" + tags + ", metrics=" + metrics + "]";
+ }
}
/**