aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDNVindhya <DNVindhya@users.noreply.github.com>2023-03-20 14:18:16 -0700
committerEric Anderson <ejona@google.com>2023-03-21 15:01:21 -0700
commit85ce900dfcb637eb7e90119fda3b22295bc6b74e (patch)
tree254173df430e53401be97c3d79240aa5ef0b0346
parentbb39ca3ec984ac25eb74157411a1f75ece26372b (diff)
downloadgrpc-grpc-java-85ce900dfcb637eb7e90119fda3b22295bc6b74e.tar.gz
gcp-observability, census: add trace information to logs (#9963)
This commit adds trace information (TraceId, SpanId and TraceSampled) fields to LogEntry, when both logging and tracing are enabled in gcp-observability. For server-side logs, span information was readily available using Span.getContext() propagated via `io.grpc.Context`. Similar approach is not feasible for client-side architecture. Client SpanContext which has all the information required to be added to logs is propagated to the logging interceptor via `io.grpc.CallOptions`.
-rw-r--r--census/src/main/java/io/grpc/census/CensusTracingModule.java30
-rw-r--r--census/src/main/java/io/grpc/census/internal/ObservabilityCensusConstants.java5
-rw-r--r--census/src/test/java/io/grpc/census/CensusModulesTest.java21
-rw-r--r--census/src/test/java/io/grpc/census/CensusTracingAnnotationEventTest.java2
-rw-r--r--gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java10
-rw-r--r--gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java25
-rw-r--r--gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java26
-rw-r--r--gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java31
-rw-r--r--gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java30
-rw-r--r--gcp-observability/src/main/java/io/grpc/gcp/observability/logging/Sink.java3
-rw-r--r--gcp-observability/src/main/java/io/grpc/gcp/observability/logging/TraceLoggingHelper.java49
-rw-r--r--gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java14
-rw-r--r--gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptorTest.java212
-rw-r--r--gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptorTest.java25
-rw-r--r--gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java102
-rw-r--r--gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java135
-rw-r--r--gcp-observability/src/test/java/io/grpc/gcp/observability/logging/TraceLoggingHelperTest.java91
17 files changed, 686 insertions, 125 deletions
diff --git a/census/src/main/java/io/grpc/census/CensusTracingModule.java b/census/src/main/java/io/grpc/census/CensusTracingModule.java
index 2e9b6b5b6..4fa9e2874 100644
--- a/census/src/main/java/io/grpc/census/CensusTracingModule.java
+++ b/census/src/main/java/io/grpc/census/CensusTracingModule.java
@@ -17,6 +17,7 @@
package io.grpc.census;
import static com.google.common.base.Preconditions.checkNotNull;
+import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
@@ -124,8 +125,8 @@ final class CensusTracingModule {
*/
@VisibleForTesting
CallAttemptsTracerFactory newClientCallTracer(
- @Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
- return new CallAttemptsTracerFactory(parentSpan, method);
+ @Nullable Span clientSpan, MethodDescriptor<?, ?> method) {
+ return new CallAttemptsTracerFactory(clientSpan, method);
}
/**
@@ -248,17 +249,11 @@ final class CensusTracingModule {
private final Span span;
private final String fullMethodName;
- CallAttemptsTracerFactory(@Nullable Span parentSpan, MethodDescriptor<?, ?> method) {
+ CallAttemptsTracerFactory(@Nullable Span clientSpan, MethodDescriptor<?, ?> method) {
checkNotNull(method, "method");
this.isSampledToLocalTracing = method.isSampledToLocalTracing();
this.fullMethodName = method.getFullMethodName();
- this.span =
- censusTracer
- .spanBuilderWithExplicitParent(
- generateTraceSpanName(false, fullMethodName),
- parentSpan)
- .setRecordEvents(true)
- .startSpan();
+ this.span = clientSpan;
}
@Override
@@ -461,13 +456,20 @@ final class CensusTracingModule {
// Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value
// as Tracer.getCurrentSpan() except when no value available when the return value is null
// for the direct access and BlankSpan when Tracer API is used.
- final CallAttemptsTracerFactory tracerFactory =
- newClientCallTracer(
- io.opencensus.trace.unsafe.ContextUtils.getValue(Context.current()), method);
+ Span parentSpan = io.opencensus.trace.unsafe.ContextUtils.getValue(Context.current());
+ Span clientSpan = censusTracer
+ .spanBuilderWithExplicitParent(
+ generateTraceSpanName(false, method.getFullMethodName()),
+ parentSpan)
+ .setRecordEvents(true)
+ .startSpan();
+
+ final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(clientSpan, method);
ClientCall<ReqT, RespT> call =
next.newCall(
method,
- callOptions.withStreamTracerFactory(tracerFactory));
+ callOptions.withStreamTracerFactory(tracerFactory)
+ .withOption(CLIENT_TRACE_SPAN_CONTEXT_KEY, clientSpan.getContext()));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
diff --git a/census/src/main/java/io/grpc/census/internal/ObservabilityCensusConstants.java b/census/src/main/java/io/grpc/census/internal/ObservabilityCensusConstants.java
index a944e4ffd..8dd0bc30b 100644
--- a/census/src/main/java/io/grpc/census/internal/ObservabilityCensusConstants.java
+++ b/census/src/main/java/io/grpc/census/internal/ObservabilityCensusConstants.java
@@ -26,11 +26,13 @@ import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER
import static io.opencensus.contrib.grpc.metrics.RpcMeasureConstants.GRPC_SERVER_STATUS;
import com.google.common.annotations.VisibleForTesting;
+import io.grpc.CallOptions;
import io.opencensus.contrib.grpc.metrics.RpcViewConstants;
import io.opencensus.stats.Aggregation;
import io.opencensus.stats.Measure;
import io.opencensus.stats.Measure.MeasureDouble;
import io.opencensus.stats.View;
+import io.opencensus.trace.SpanContext;
import java.util.Arrays;
// TODO(dnvindhya): Remove metric and view definitions from this class once it is moved to
@@ -42,6 +44,9 @@ import java.util.Arrays;
@VisibleForTesting
public final class ObservabilityCensusConstants {
+ public static CallOptions.Key<SpanContext> CLIENT_TRACE_SPAN_CONTEXT_KEY
+ = CallOptions.Key.createWithDefault("Client span context for tracing", SpanContext.INVALID);
+
static final Aggregation AGGREGATION_WITH_BYTES_HISTOGRAM =
RpcViewConstants.GRPC_CLIENT_SENT_BYTES_PER_RPC_VIEW.getAggregation();
diff --git a/census/src/test/java/io/grpc/census/CensusModulesTest.java b/census/src/test/java/io/grpc/census/CensusModulesTest.java
index 97d96dc59..692395b36 100644
--- a/census/src/test/java/io/grpc/census/CensusModulesTest.java
+++ b/census/src/test/java/io/grpc/census/CensusModulesTest.java
@@ -22,6 +22,7 @@ import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRIES
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.RETRY_DELAY_PER_CALL;
import static io.grpc.census.CensusStatsModule.CallAttemptsTracerFactory.TRANSPARENT_RETRIES_PER_CALL;
import static io.grpc.census.internal.ObservabilityCensusConstants.API_LATENCY_PER_CALL;
+import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -317,6 +318,10 @@ public class CensusModulesTest {
capturedCallOptions.get().getStreamTracerFactories().get(1)
instanceof CensusStatsModule.CallAttemptsTracerFactory);
+ // The interceptor adds client SpanContext to CallOptions
+ assertTrue(capturedCallOptions.get().getOption(CLIENT_TRACE_SPAN_CONTEXT_KEY).isValid());
+ assertTrue(capturedCallOptions.get().getOption(CLIENT_TRACE_SPAN_CONTEXT_KEY) != null);
+
// Make the call
Metadata headers = new Metadata();
call.start(mockClientCallListener, headers);
@@ -738,13 +743,11 @@ public class CensusModulesTest {
@Test
public void clientBasicTracingDefaultSpan() {
CallAttemptsTracerFactory callTracer =
- censusTracing.newClientCallTracer(null, method);
+ censusTracing.newClientCallTracer(spyClientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
verify(tracer).spanBuilderWithExplicitParent(
- eq("Sent.package1.service2.method3"), ArgumentMatchers.<Span>isNull());
- verify(tracer).spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), eq(spyClientSpan));
verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
verify(spyAttemptSpan, never()).end(any(EndSpanOptions.class));
@@ -797,7 +800,7 @@ public class CensusModulesTest {
@Test
public void clientTracingSampledToLocalSpanStore() {
CallAttemptsTracerFactory callTracer =
- censusTracing.newClientCallTracer(null, sampledMethod);
+ censusTracing.newClientCallTracer(spyClientSpan, sampledMethod);
callTracer.callEnded(Status.OK);
verify(spyClientSpan).end(
@@ -867,10 +870,7 @@ public class CensusModulesTest {
@Test
public void clientStreamNeverCreatedStillRecordTracing() {
CallAttemptsTracerFactory callTracer =
- censusTracing.newClientCallTracer(fakeClientParentSpan, method);
- verify(tracer).spanBuilderWithExplicitParent(
- eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
- verify(spyClientSpanBuilder).setRecordEvents(eq(true));
+ censusTracing.newClientCallTracer(spyClientSpan, method);
callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
verify(spyClientSpan).end(
@@ -1046,7 +1046,7 @@ public class CensusModulesTest {
@Test
public void traceHeadersPropagateSpanContext() throws Exception {
CallAttemptsTracerFactory callTracer =
- censusTracing.newClientCallTracer(fakeClientParentSpan, method);
+ censusTracing.newClientCallTracer(spyClientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer streamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
streamTracer.streamCreated(Attributes.EMPTY, headers);
@@ -1054,10 +1054,7 @@ public class CensusModulesTest {
verify(mockTracingPropagationHandler).toByteArray(same(fakeAttemptSpanContext));
verifyNoMoreInteractions(mockTracingPropagationHandler);
verify(tracer).spanBuilderWithExplicitParent(
- eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
- verify(tracer).spanBuilderWithExplicitParent(
eq("Attempt.package1.service2.method3"), same(spyClientSpan));
- verify(spyClientSpanBuilder).setRecordEvents(eq(true));
verifyNoMoreInteractions(tracer);
assertTrue(headers.containsKey(censusTracing.tracingHeader));
diff --git a/census/src/test/java/io/grpc/census/CensusTracingAnnotationEventTest.java b/census/src/test/java/io/grpc/census/CensusTracingAnnotationEventTest.java
index 80f2131c2..8d7d2b2b7 100644
--- a/census/src/test/java/io/grpc/census/CensusTracingAnnotationEventTest.java
+++ b/census/src/test/java/io/grpc/census/CensusTracingAnnotationEventTest.java
@@ -166,7 +166,7 @@ public class CensusTracingAnnotationEventTest {
@Test
public void clientBasicTracingUncompressedSizeAnnotation() {
CallAttemptsTracerFactory callTracer =
- censusTracing.newClientCallTracer(null, method);
+ censusTracing.newClientCallTracer(spyClientSpan, method);
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
clientStreamTracer.streamCreated(Attributes.EMPTY, headers);
diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java
index 14b5c29aa..9470de770 100644
--- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java
+++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java
@@ -36,6 +36,7 @@ import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
+import io.grpc.gcp.observability.logging.TraceLoggingHelper;
import io.opencensus.common.Duration;
import io.opencensus.contrib.grpc.metrics.RpcViewConstants;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
@@ -58,7 +59,8 @@ import java.util.stream.Collectors;
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public final class GcpObservability implements AutoCloseable {
private static final int METRICS_EXPORT_INTERVAL = 30;
- private static final ImmutableSet<String> SERVICES_TO_EXCLUDE = ImmutableSet.of(
+ @VisibleForTesting
+ static final ImmutableSet<String> SERVICES_TO_EXCLUDE = ImmutableSet.of(
"google.logging.v2.LoggingServiceV2", "google.monitoring.v3.MetricService",
"google.devtools.cloudtrace.v2.TraceService");
private static GcpObservability instance = null;
@@ -77,9 +79,11 @@ public final class GcpObservability implements AutoCloseable {
if (instance == null) {
GlobalLocationTags globalLocationTags = new GlobalLocationTags();
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
+ TraceLoggingHelper traceLoggingHelper = new TraceLoggingHelper(
+ observabilityConfig.getProjectId());
Sink sink = new GcpLogSink(observabilityConfig.getProjectId(),
- globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
- SERVICES_TO_EXCLUDE);
+ globalLocationTags.getLocationTags(), observabilityConfig,
+ SERVICES_TO_EXCLUDE, traceLoggingHelper);
LogHelper helper = new LogHelper(sink);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.getInstance(observabilityConfig);
instance = grpcInit(sink, observabilityConfig,
diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java
index 517745a5a..f5082564d 100644
--- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java
+++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java
@@ -16,6 +16,8 @@
package io.grpc.gcp.observability.interceptors;
+import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
+
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.CallOptions;
@@ -33,6 +35,7 @@ import io.grpc.Status;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
+import io.opencensus.trace.SpanContext;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -92,6 +95,7 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
// Get the stricter deadline to calculate the timeout once the call starts
final Deadline deadline = LogHelper.min(callOptions.getDeadline(),
Context.current().getDeadline());
+ final SpanContext clientSpanContext = callOptions.getOption(CLIENT_TRACE_SPAN_CONTEXT_KEY);
FilterParams filterParams = filterHelper.logRpcMethod(method.getFullMethodName(), true);
if (!filterParams.log()) {
@@ -122,7 +126,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
maxHeaderBytes,
EventLogger.CLIENT,
callId,
- null);
+ null,
+ clientSpanContext);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
@@ -148,7 +153,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
message,
maxMessageBytes,
EventLogger.CLIENT,
- callId);
+ callId,
+ clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
@@ -168,7 +174,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
maxHeaderBytes,
EventLogger.CLIENT,
callId,
- LogHelper.getPeerAddress(getAttributes()));
+ LogHelper.getPeerAddress(getAttributes()),
+ clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
@@ -189,7 +196,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
maxHeaderBytes,
EventLogger.CLIENT,
callId,
- LogHelper.getPeerAddress(getAttributes()));
+ LogHelper.getPeerAddress(getAttributes()),
+ clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
@@ -212,7 +220,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
message,
maxMessageBytes,
EventLogger.CLIENT,
- callId);
+ callId,
+ clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
@@ -229,7 +238,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
methodName,
authority,
EventLogger.CLIENT,
- callId);
+ callId,
+ clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
@@ -246,7 +256,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
methodName,
authority,
EventLogger.CLIENT,
- callId);
+ callId,
+ clientSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java
index acb8df291..78d488ef5 100644
--- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java
+++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptor.java
@@ -31,6 +31,9 @@ import io.grpc.Status;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
+import io.opencensus.trace.Span;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.unsafe.ContextHandleUtils;
import java.net.SocketAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -91,6 +94,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
Deadline deadline = Context.current().getDeadline();
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
+ Span span = ContextHandleUtils.getValue(ContextHandleUtils.currentContext());
+ final SpanContext serverSpanContext = span == null ? SpanContext.INVALID : span.getContext();
FilterParams filterParams =
filterHelper.logRpcMethod(call.getMethodDescriptor().getFullMethodName(), false);
@@ -113,7 +118,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
maxHeaderBytes,
EventLogger.SERVER,
callId,
- peerAddress);
+ peerAddress,
+ serverSpanContext);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
@@ -139,7 +145,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
maxHeaderBytes,
EventLogger.SERVER,
callId,
- null);
+ null,
+ serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
@@ -160,7 +167,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
message,
maxMessageBytes,
EventLogger.SERVER,
- callId);
+ callId,
+ serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
@@ -181,7 +189,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
maxHeaderBytes,
EventLogger.SERVER,
callId,
- null);
+ null,
+ serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
@@ -206,7 +215,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
message,
maxMessageBytes,
EventLogger.SERVER,
- callId);
+ callId,
+ serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
@@ -223,7 +233,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
methodName,
authority,
EventLogger.SERVER,
- callId);
+ callId,
+ serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
@@ -240,7 +251,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
methodName,
authority,
EventLogger.SERVER,
- callId);
+ callId,
+ serverSpanContext);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java
index abd44c436..65b539a66 100644
--- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java
+++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java
@@ -36,6 +36,7 @@ import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.Payload;
+import io.opencensus.trace.SpanContext;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
@@ -88,7 +89,8 @@ public class LogHelper {
GrpcLogRecord.EventLogger eventLogger,
String callId,
// null on client side
- @Nullable SocketAddress peerAddress) {
+ @Nullable SocketAddress peerAddress,
+ SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@@ -114,7 +116,7 @@ public class LogHelper {
if (peerAddress != null) {
logEntryBuilder.setPeer(socketAddressToProto(peerAddress));
}
- sink.write(logEntryBuilder.build());
+ sink.write(logEntryBuilder.build(), spanContext);
}
/**
@@ -129,7 +131,8 @@ public class LogHelper {
int maxHeaderBytes,
GrpcLogRecord.EventLogger eventLogger,
String callId,
- @Nullable SocketAddress peerAddress) {
+ @Nullable SocketAddress peerAddress,
+ SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@@ -155,7 +158,7 @@ public class LogHelper {
if (peerAddress != null) {
logEntryBuilder.setPeer(socketAddressToProto(peerAddress));
}
- sink.write(logEntryBuilder.build());
+ sink.write(logEntryBuilder.build(), spanContext);
}
/**
@@ -171,7 +174,8 @@ public class LogHelper {
int maxHeaderBytes,
GrpcLogRecord.EventLogger eventLogger,
String callId,
- @Nullable SocketAddress peerAddress) {
+ @Nullable SocketAddress peerAddress,
+ SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@@ -205,7 +209,7 @@ public class LogHelper {
if (peerAddress != null) {
logEntryBuilder.setPeer(socketAddressToProto(peerAddress));
}
- sink.write(logEntryBuilder.build());
+ sink.write(logEntryBuilder.build(), spanContext);
}
/**
@@ -220,7 +224,8 @@ public class LogHelper {
T message,
int maxMessageBytes,
EventLogger eventLogger,
- String callId) {
+ String callId,
+ SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@@ -260,7 +265,7 @@ public class LogHelper {
logEntryBuilder.setPayload(pair.payloadBuilder)
.setPayloadTruncated(pair.truncated);
}
- sink.write(logEntryBuilder.build());
+ sink.write(logEntryBuilder.build(), spanContext);
}
/**
@@ -272,7 +277,8 @@ public class LogHelper {
String methodName,
String authority,
GrpcLogRecord.EventLogger eventLogger,
- String callId) {
+ String callId,
+ SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@@ -286,7 +292,7 @@ public class LogHelper {
.setType(EventType.CLIENT_HALF_CLOSE)
.setLogger(eventLogger)
.setCallId(callId);
- sink.write(logEntryBuilder.build());
+ sink.write(logEntryBuilder.build(), spanContext);
}
/**
@@ -298,7 +304,8 @@ public class LogHelper {
String methodName,
String authority,
GrpcLogRecord.EventLogger eventLogger,
- String callId) {
+ String callId,
+ SpanContext spanContext) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
@@ -312,7 +319,7 @@ public class LogHelper {
.setType(EventType.CANCEL)
.setLogger(eventLogger)
.setCallId(callId);
- sink.write(logEntryBuilder.build());
+ sink.write(logEntryBuilder.build(), spanContext);
}
// TODO(DNVindhya): Evaluate if we need following clause for metadata logging in GcpObservability
diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java
index 02ff4049e..fab38a23d 100644
--- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java
+++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java
@@ -33,8 +33,10 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.util.JsonFormat;
import io.grpc.Internal;
+import io.grpc.gcp.observability.ObservabilityConfig;
import io.grpc.internal.JsonParser;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
+import io.opencensus.trace.SpanContext;
import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
@@ -67,11 +69,16 @@ public class GcpLogSink implements Sink {
* logging APIs also uses gRPC. */
private volatile Logging gcpLoggingClient;
private final Collection<String> servicesToExclude;
+ private final boolean isTraceEnabled;
+
+ private final TraceLoggingHelper traceLoggingHelper;
+
@VisibleForTesting
GcpLogSink(Logging loggingClient, String projectId, Map<String, String> locationTags,
- Map<String, String> customTags, Collection<String> servicesToExclude) {
- this(projectId, locationTags, customTags, servicesToExclude);
+ ObservabilityConfig config, Collection<String> servicesToExclude,
+ TraceLoggingHelper traceLoggingHelper) {
+ this(projectId, locationTags, config, servicesToExclude, traceLoggingHelper);
this.gcpLoggingClient = loggingClient;
}
@@ -82,11 +89,14 @@ public class GcpLogSink implements Sink {
* @param servicesToExclude service names for which log entries should not be generated
*/
public GcpLogSink(String projectId, Map<String, String> locationTags,
- Map<String, String> customTags, Collection<String> servicesToExclude) {
+ ObservabilityConfig config, Collection<String> servicesToExclude,
+ TraceLoggingHelper traceLoggingHelper) {
this.projectId = projectId;
- this.customTags = getCustomTags(customTags, locationTags, projectId);
+ this.customTags = getCustomTags(config.getCustomTags(), locationTags, projectId);
this.kubernetesResource = getResource(locationTags);
this.servicesToExclude = checkNotNull(servicesToExclude, "servicesToExclude");
+ this.isTraceEnabled = config.isEnableCloudTracing();
+ this.traceLoggingHelper = traceLoggingHelper;
}
/**
@@ -95,7 +105,7 @@ public class GcpLogSink implements Sink {
* @param logProto gRPC logging proto containing the message to be logged
*/
@Override
- public void write(GrpcLogRecord logProto) {
+ public void write(GrpcLogRecord logProto, SpanContext spanContext) {
if (gcpLoggingClient == null) {
synchronized (this) {
if (gcpLoggingClient == null) {
@@ -122,7 +132,10 @@ public class GcpLogSink implements Sink {
if (!customTags.isEmpty()) {
grpcLogEntryBuilder.setLabels(customTags);
}
+
+ addTraceData(grpcLogEntryBuilder, spanContext);
grpcLogEntry = grpcLogEntryBuilder.build();
+
synchronized (this) {
logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", eventType);
gcpLoggingClient.write(Collections.singleton(grpcLogEntry));
@@ -139,6 +152,13 @@ public class GcpLogSink implements Sink {
}
}
+ void addTraceData(LogEntry.Builder builder, SpanContext spanContext) {
+ if (!isTraceEnabled) {
+ return;
+ }
+ traceLoggingHelper.enhanceLogEntry(builder, spanContext);
+ }
+
Logging createLoggingClient() {
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
if (!Strings.isNullOrEmpty(projectId)) {
diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/Sink.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/Sink.java
index c0908cfe3..a29aab0c4 100644
--- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/Sink.java
+++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/Sink.java
@@ -18,6 +18,7 @@ package io.grpc.gcp.observability.logging;
import io.grpc.Internal;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
+import io.opencensus.trace.SpanContext;
/**
* Sink for GCP observability.
@@ -27,7 +28,7 @@ public interface Sink {
/**
* Writes the {@code message} to the destination.
*/
- void write(GrpcLogRecord message);
+ void write(GrpcLogRecord message, SpanContext spanContext);
/**
* Closes the sink.
diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/TraceLoggingHelper.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/TraceLoggingHelper.java
new file mode 100644
index 000000000..a53f934a5
--- /dev/null
+++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/TraceLoggingHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2023 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.gcp.observability.logging;
+
+import com.google.cloud.logging.LogEntry;
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Internal;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.TraceId;
+
+@Internal
+public class TraceLoggingHelper {
+
+ private final String tracePrefix;
+
+ public TraceLoggingHelper(String projectId) {
+ this.tracePrefix = "projects/" + projectId + "/traces/";;
+ }
+
+ @VisibleForTesting
+ void enhanceLogEntry(LogEntry.Builder builder, SpanContext spanContext) {
+ addTracingData(tracePrefix, spanContext, builder);
+ }
+
+ private static void addTracingData(
+ String tracePrefix, SpanContext spanContext, LogEntry.Builder builder) {
+ builder.setTrace(formatTraceId(tracePrefix, spanContext.getTraceId()));
+ builder.setSpanId(spanContext.getSpanId().toLowerBase16());
+ builder.setTraceSampled(spanContext.getTraceOptions().isSampled());
+ }
+
+ private static String formatTraceId(String tracePrefix, TraceId traceId) {
+ return tracePrefix + traceId.toLowerBase16();
+ }
+}
diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java
index 992ccc5db..083b1c888 100644
--- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java
+++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java
@@ -38,9 +38,11 @@ import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
+import io.grpc.gcp.observability.logging.TraceLoggingHelper;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
+import io.opencensus.trace.SpanContext;
import java.io.IOException;
import java.util.Collections;
import java.util.regex.Pattern;
@@ -49,7 +51,9 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
@RunWith(JUnit4.class)
@@ -111,10 +115,12 @@ public class LoggingTest {
@Override
public void run() {
+ ObservabilityConfig config = mock(ObservabilityConfig.class);
+ when(config.getCustomTags()).thenReturn(CUSTOM_TAGS);
Sink sink =
new GcpLogSink(
- PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, Collections.emptySet());
- ObservabilityConfig config = mock(ObservabilityConfig.class);
+ PROJECT_ID, LOCATION_TAGS, config, Collections.emptySet(),
+ mock(TraceLoggingHelper.class));
LogHelper spyLogHelper = spy(new LogHelper(sink));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
@@ -237,7 +243,9 @@ public class LoggingTest {
// = 8
assertThat(Mockito.mockingDetails(mockSink).getInvocations().size()).isEqualTo(12);
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
- verify(mockSink, times(12)).write(captor.capture());
+ verify(mockSink, times(12)).write(captor.capture(),
+ AdditionalMatchers.or(ArgumentMatchers.isNull(),
+ ArgumentMatchers.any(SpanContext.class)));
for (GrpcLogRecord record : captor.getAllValues()) {
assertThat(record.getType()).isInstanceOf(GrpcLogRecord.EventType.class);
assertThat(record.getLogger()).isInstanceOf(GrpcLogRecord.EventLogger.class);
diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptorTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptorTest.java
index 2a2e1d4c2..7458a5329 100644
--- a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptorTest.java
+++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptorTest.java
@@ -17,6 +17,7 @@
package io.grpc.gcp.observability.interceptors;
import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.census.internal.ObservabilityCensusConstants.CLIENT_TRACE_SPAN_CONTEXT_KEY;
import static io.grpc.gcp.observability.interceptors.LogHelperTest.BYTEARRAY_MARSHALLER;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
@@ -52,6 +53,11 @@ import io.grpc.internal.NoopClientCall;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.SpanId;
+import io.opencensus.trace.TraceId;
+import io.opencensus.trace.TraceOptions;
+import io.opencensus.trace.Tracestate;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -83,6 +89,14 @@ public class InternalLoggingChannelInterceptorTest {
public final MockitoRule mockito = MockitoJUnit.rule();
private static final Charset US_ASCII = StandardCharsets.US_ASCII;
+ private static final SpanContext DEFAULT_CLIENT_SPAN_CONTEXT = SpanContext.INVALID;
+ private static final SpanContext SPAN_CONTEXT = SpanContext.create(
+ TraceId.fromLowerBase16("4c6af40c499951eb7de2777ba1e4fefa"),
+ SpanId.fromLowerBase16("de52e84d13dd232d"),
+ TraceOptions.builder().setIsSampled(true).build(),
+ Tracestate.builder().build());
+ private static final CallOptions CALL_OPTIONS_WITH_SPAN_CONTEXT =
+ CallOptions.DEFAULT.withOption(CLIENT_TRACE_SPAN_CONTEXT_KEY, SPAN_CONTEXT);
private InternalLoggingChannelInterceptor.Factory factory;
private AtomicReference<ClientCall.Listener<byte[]>> interceptedListener;
@@ -192,7 +206,8 @@ public class InternalLoggingChannelInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.CLIENT),
anyString(),
- ArgumentMatchers.isNull());
+ ArgumentMatchers.isNull(),
+ eq(DEFAULT_CLIENT_SPAN_CONTEXT));
verifyNoMoreInteractions(mockLogHelper);
assertSame(clientInitial, actualClientInitial.get());
}
@@ -213,7 +228,8 @@ public class InternalLoggingChannelInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.CLIENT),
anyString(),
- same(peer));
+ same(peer),
+ any(SpanContext.class));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onHeaders(same(serverInitial));
}
@@ -234,7 +250,8 @@ public class InternalLoggingChannelInterceptorTest {
same(request),
eq(filterParams.messageBytes()),
eq(EventLogger.CLIENT),
- anyString());
+ anyString(),
+ any(SpanContext.class));
verifyNoMoreInteractions(mockLogHelper);
assertSame(request, actualRequest.get());
}
@@ -251,7 +268,8 @@ public class InternalLoggingChannelInterceptorTest {
eq("method"),
eq("the-authority"),
eq(EventLogger.CLIENT),
- anyString());
+ anyString(),
+ any(SpanContext.class));
halfCloseCalled.get(1, TimeUnit.MILLISECONDS);
verifyNoMoreInteractions(mockLogHelper);
}
@@ -272,7 +290,8 @@ public class InternalLoggingChannelInterceptorTest {
same(response),
eq(filterParams.messageBytes()),
eq(EventLogger.CLIENT),
- anyString());
+ anyString(),
+ any(SpanContext.class));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onMessage(same(response));
}
@@ -295,7 +314,8 @@ public class InternalLoggingChannelInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.CLIENT),
anyString(),
- same(peer));
+ same(peer),
+ any(SpanContext.class));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onClose(same(status), same(trailers));
}
@@ -312,7 +332,8 @@ public class InternalLoggingChannelInterceptorTest {
eq("method"),
eq("the-authority"),
eq(EventLogger.CLIENT),
- anyString());
+ anyString(),
+ any(SpanContext.class));
cancelCalled.get(1, TimeUnit.MILLISECONDS);
}
}
@@ -363,7 +384,8 @@ public class InternalLoggingChannelInterceptorTest {
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
- ArgumentMatchers.any()));
+ ArgumentMatchers.any()),
+ any(SpanContext.class));
Duration timeout = callOptTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
@@ -422,7 +444,8 @@ public class InternalLoggingChannelInterceptorTest {
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
- ArgumentMatchers.any()));
+ ArgumentMatchers.any()),
+ any(SpanContext.class));
Duration timeout = contextTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
@@ -484,7 +507,8 @@ public class InternalLoggingChannelInterceptorTest {
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
- ArgumentMatchers.any()));
+ ArgumentMatchers.any()),
+ any(SpanContext.class));
Duration timeout = timeoutCaptor.getValue();
assertThat(LogHelper.min(contextDeadline, callOptionsDeadline))
.isSameInstanceAs(contextDeadline);
@@ -633,4 +657,172 @@ public class InternalLoggingChannelInterceptorTest {
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(7);
}
}
+
+ @Test
+ public void clientSpanContextLogged_contextSetViaCallOption() {
+ Channel channel = new Channel() {
+ @Override
+ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
+ MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
+ return new NoopClientCall<RequestT, ResponseT>() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void start(Listener<ResponseT> responseListener, Metadata headers) {
+ interceptedListener.set((Listener<byte[]>) responseListener);
+ actualClientInitial.set(headers);
+ }
+
+ @Override
+ public void sendMessage(RequestT message) {
+ actualRequest.set(message);
+ }
+
+ @Override
+ public void cancel(String message, Throwable cause) {
+ cancelCalled.set(null);
+ }
+
+ @Override
+ public void halfClose() {
+ halfCloseCalled.set(null);
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
+ }
+ };
+ }
+
+ @Override
+ public String authority() {
+ return "the-authority";
+ }
+ };
+
+ @SuppressWarnings("unchecked")
+ ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
+
+ MethodDescriptor<byte[], byte[]> method =
+ MethodDescriptor.<byte[], byte[]>newBuilder()
+ .setType(MethodType.UNKNOWN)
+ .setFullMethodName("service/method")
+ .setRequestMarshaller(BYTEARRAY_MARSHALLER)
+ .setResponseMarshaller(BYTEARRAY_MARSHALLER)
+ .build();
+ when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), true))
+ .thenReturn(FilterParams.create(true, 10, 10));
+
+ ClientCall<byte[], byte[]> interceptedLoggingCall =
+ factory.create()
+ .interceptCall(method,
+ CALL_OPTIONS_WITH_SPAN_CONTEXT,
+ channel);
+
+ {
+ interceptedLoggingCall.start(mockListener, new Metadata());
+ ArgumentCaptor<SpanContext> callOptSpanContextCaptor = ArgumentCaptor.forClass(
+ SpanContext.class);
+ verify(mockLogHelper, times(1))
+ .logClientHeader(
+ anyLong(),
+ AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
+ AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
+ AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
+ ArgumentMatchers.isNull(),
+ any(Metadata.class),
+ anyInt(),
+ any(GrpcLogRecord.EventLogger.class),
+ anyString(),
+ AdditionalMatchers.or(ArgumentMatchers.isNull(),
+ ArgumentMatchers.any()),
+ callOptSpanContextCaptor.capture());
+ SpanContext spanContext = callOptSpanContextCaptor.getValue();
+ assertThat(spanContext).isEqualTo(SPAN_CONTEXT);
+ }
+ }
+
+ @Test
+ public void clientSpanContextLogged_contextNotSetViaCallOption() {
+ Channel channel = new Channel() {
+ @Override
+ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
+ MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
+ return new NoopClientCall<RequestT, ResponseT>() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void start(Listener<ResponseT> responseListener, Metadata headers) {
+ interceptedListener.set((Listener<byte[]>) responseListener);
+ actualClientInitial.set(headers);
+ }
+
+ @Override
+ public void sendMessage(RequestT message) {
+ actualRequest.set(message);
+ }
+
+ @Override
+ public void cancel(String message, Throwable cause) {
+ cancelCalled.set(null);
+ }
+
+ @Override
+ public void halfClose() {
+ halfCloseCalled.set(null);
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
+ }
+ };
+ }
+
+ @Override
+ public String authority() {
+ return "the-authority";
+ }
+ };
+
+ @SuppressWarnings("unchecked")
+ ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
+
+ MethodDescriptor<byte[], byte[]> method =
+ MethodDescriptor.<byte[], byte[]>newBuilder()
+ .setType(MethodType.UNKNOWN)
+ .setFullMethodName("service/method")
+ .setRequestMarshaller(BYTEARRAY_MARSHALLER)
+ .setResponseMarshaller(BYTEARRAY_MARSHALLER)
+ .build();
+ when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), true))
+ .thenReturn(FilterParams.create(true, 10, 10));
+
+ ClientCall<byte[], byte[]> interceptedLoggingCall =
+ factory.create()
+ .interceptCall(method,
+ CallOptions.DEFAULT,
+ channel);
+
+ {
+ interceptedLoggingCall.start(mockListener, new Metadata());
+ ArgumentCaptor<SpanContext> callOptSpanContextCaptor = ArgumentCaptor.forClass(
+ SpanContext.class);
+ verify(mockLogHelper, times(1))
+ .logClientHeader(
+ anyLong(),
+ AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
+ AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
+ AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
+ ArgumentMatchers.isNull(),
+ any(Metadata.class),
+ anyInt(),
+ any(GrpcLogRecord.EventLogger.class),
+ anyString(),
+ AdditionalMatchers.or(ArgumentMatchers.isNull(),
+ ArgumentMatchers.any()),
+ callOptSpanContextCaptor.capture());
+ SpanContext spanContext = callOptSpanContextCaptor.getValue();
+ assertThat(spanContext).isEqualTo(DEFAULT_CLIENT_SPAN_CONTEXT);
+ }
+ }
}
diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptorTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptorTest.java
index fee936dfb..fc4eec7f4 100644
--- a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptorTest.java
+++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/InternalLoggingServerInterceptorTest.java
@@ -45,6 +45,7 @@ import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.internal.NoopServerCall;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
+import io.opencensus.trace.SpanContext;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -171,7 +172,8 @@ public class InternalLoggingServerInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.SERVER),
anyString(),
- same(peer));
+ same(peer),
+ eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
}
@@ -191,7 +193,8 @@ public class InternalLoggingServerInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.SERVER),
anyString(),
- ArgumentMatchers.isNull());
+ ArgumentMatchers.isNull(),
+ eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
assertSame(serverInitial, actualServerInitial.get());
}
@@ -212,7 +215,8 @@ public class InternalLoggingServerInterceptorTest {
same(request),
eq(filterParams.messageBytes()),
eq(EventLogger.SERVER),
- anyString());
+ anyString(),
+ eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onMessage(same(request));
}
@@ -229,7 +233,8 @@ public class InternalLoggingServerInterceptorTest {
eq("method"),
eq("the-authority"),
eq(EventLogger.SERVER),
- anyString());
+ anyString(),
+ eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onHalfClose();
}
@@ -250,7 +255,8 @@ public class InternalLoggingServerInterceptorTest {
same(response),
eq(filterParams.messageBytes()),
eq(EventLogger.SERVER),
- anyString());
+ anyString(),
+ eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
assertSame(response, actualResponse.get());
}
@@ -273,7 +279,8 @@ public class InternalLoggingServerInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.SERVER),
anyString(),
- ArgumentMatchers.isNull());
+ ArgumentMatchers.isNull(),
+ eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
assertSame(status, actualStatus.get());
assertSame(trailers, actualTrailers.get());
@@ -291,7 +298,8 @@ public class InternalLoggingServerInterceptorTest {
eq("method"),
eq("the-authority"),
eq(EventLogger.SERVER),
- anyString());
+ anyString(),
+ eq(SpanContext.INVALID));
verify(mockListener).onCancel();
}
}
@@ -342,7 +350,8 @@ public class InternalLoggingServerInterceptorTest {
eq(filterParams.headerBytes()),
eq(EventLogger.SERVER),
anyString(),
- ArgumentMatchers.isNull());
+ ArgumentMatchers.isNull(),
+ eq(SpanContext.INVALID));
verifyNoMoreInteractions(mockLogHelper);
Duration timeout = timeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java
index a6d9fab70..fb1f0e894 100644
--- a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java
+++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java
@@ -43,6 +43,11 @@ import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.Payload;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.SpanId;
+import io.opencensus.trace.TraceId;
+import io.opencensus.trace.TraceOptions;
+import io.opencensus.trace.Tracestate;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -75,11 +80,22 @@ public class LogHelperTest {
Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER);
private static final int HEADER_LIMIT = 10;
private static final int MESSAGE_LIMIT = Integer.MAX_VALUE;
+ private static final SpanContext CLIENT_SPAN_CONTEXT = SpanContext.create(
+ TraceId.fromLowerBase16("4c6af40c499951eb7de2777ba1e4fefa"),
+ SpanId.fromLowerBase16("de52e84d13dd232d"),
+ TraceOptions.builder().setIsSampled(true).build(),
+ Tracestate.builder().build());
+ private static final SpanContext SERVER_SPAN_CONTEXT = SpanContext.create(
+ TraceId.fromLowerBase16("549a8a64db2d0c757fdf6bb1bfe84e2c"),
+ SpanId.fromLowerBase16("a5b7704614fe903d"),
+ TraceOptions.builder().setIsSampled(true).build(),
+ Tracestate.builder().build());
private final Metadata nonEmptyMetadata = new Metadata();
private final Sink sink = mock(GcpLogSink.class);
private final LogHelper logHelper = new LogHelper(sink);
+
@Before
public void setUp() {
nonEmptyMetadata.put(KEY_A, DATA_A);
@@ -288,8 +304,9 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
- null);
- verify(sink).write(base);
+ null,
+ CLIENT_SPAN_CONTEXT);
+ verify(sink).write(base, CLIENT_SPAN_CONTEXT);
}
// logged on server
@@ -304,12 +321,14 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.SERVER,
callId,
- peerAddress);
+ peerAddress,
+ SERVER_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setPeer(LogHelper.socketAddressToProto(peerAddress))
.setLogger(EventLogger.SERVER)
- .build());
+ .build(),
+ SERVER_SPAN_CONTEXT);
}
// timeout is null
@@ -324,11 +343,13 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
- null);
+ null,
+ CLIENT_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setPayload(base.getPayload().toBuilder().clearTimeout().build())
- .build());
+ .build(),
+ CLIENT_SPAN_CONTEXT);
}
// peerAddress is not null (error on client)
@@ -343,7 +364,8 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
- peerAddress);
+ peerAddress,
+ CLIENT_SPAN_CONTEXT);
fail();
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat().contains("peerAddress can only be specified by server");
@@ -386,8 +408,9 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
- peerAddress);
- verify(sink).write(base);
+ peerAddress,
+ CLIENT_SPAN_CONTEXT);
+ verify(sink).write(base, CLIENT_SPAN_CONTEXT);
}
// logged on server
@@ -401,12 +424,14 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.SERVER,
callId,
- null);
+ null,
+ SERVER_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setLogger(EventLogger.SERVER)
.clearPeer()
- .build());
+ .build(),
+ SERVER_SPAN_CONTEXT);
}
// peerAddress is not null (error on server)
@@ -420,7 +445,8 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.SERVER,
callId,
- peerAddress);
+ peerAddress,
+ SERVER_SPAN_CONTEXT);
fail();
} catch (IllegalArgumentException expected) {
@@ -472,8 +498,9 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
- peer);
- verify(sink).write(base);
+ peer,
+ CLIENT_SPAN_CONTEXT);
+ verify(sink).write(base, CLIENT_SPAN_CONTEXT);
}
// logged on server
@@ -488,12 +515,14 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.SERVER,
callId,
- null);
+ null,
+ SERVER_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.clearPeer()
.setLogger(EventLogger.SERVER)
- .build());
+ .build(),
+ SERVER_SPAN_CONTEXT);
}
// peer address is null
@@ -508,11 +537,13 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
- null);
+ null,
+ CLIENT_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.clearPeer()
- .build());
+ .build(),
+ CLIENT_SPAN_CONTEXT);
}
// status description is null
@@ -527,11 +558,13 @@ public class LogHelperTest {
HEADER_LIMIT,
EventLogger.CLIENT,
callId,
- peer);
+ peer,
+ CLIENT_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setPayload(base.getPayload().toBuilder().clearStatusMessage().build())
- .build());
+ .build(),
+ CLIENT_SPAN_CONTEXT);
}
}
@@ -591,8 +624,9 @@ public class LogHelperTest {
message,
MESSAGE_LIMIT,
EventLogger.CLIENT,
- callId);
- verify(sink).write(base);
+ callId,
+ CLIENT_SPAN_CONTEXT);
+ verify(sink).write(base, CLIENT_SPAN_CONTEXT);
}
// response message, logged on client
{
@@ -605,11 +639,13 @@ public class LogHelperTest {
message,
MESSAGE_LIMIT,
EventLogger.CLIENT,
- callId);
+ callId,
+ CLIENT_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setType(EventType.SERVER_MESSAGE)
- .build());
+ .build(),
+ CLIENT_SPAN_CONTEXT);
}
// request message, logged on server
{
@@ -622,11 +658,13 @@ public class LogHelperTest {
message,
MESSAGE_LIMIT,
EventLogger.SERVER,
- callId);
+ callId,
+ SERVER_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.setLogger(EventLogger.SERVER)
- .build());
+ .build(),
+ SERVER_SPAN_CONTEXT);
}
// response message, logged on server
{
@@ -639,12 +677,14 @@ public class LogHelperTest {
message,
MESSAGE_LIMIT,
EventLogger.SERVER,
- callId);
+ callId,
+ SpanContext.INVALID);
verify(sink).write(
base.toBuilder()
.setType(EventType.SERVER_MESSAGE)
.setLogger(EventLogger.SERVER)
- .build());
+ .build(),
+ SpanContext.INVALID);
}
// message is not of type : com.google.protobuf.Message or byte[]
{
@@ -657,12 +697,14 @@ public class LogHelperTest {
"message",
MESSAGE_LIMIT,
EventLogger.CLIENT,
- callId);
+ callId,
+ CLIENT_SPAN_CONTEXT);
verify(sink).write(
base.toBuilder()
.clearPayload()
.clearPayloadTruncated()
- .build());
+ .build(),
+ CLIENT_SPAN_CONTEXT);
}
}
diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java
index e02cc6dd4..fc7d72a70 100644
--- a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java
+++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
import com.google.cloud.MonitoredResource;
import com.google.cloud.logging.LogEntry;
@@ -31,9 +32,15 @@ import com.google.protobuf.Duration;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.google.protobuf.util.Durations;
+import io.grpc.gcp.observability.ObservabilityConfig;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.SpanId;
+import io.opencensus.trace.TraceId;
+import io.opencensus.trace.TraceOptions;
+import io.opencensus.trace.Tracestate;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -105,13 +112,16 @@ public class GcpLogSinkTest {
.build();
@Mock
private Logging mockLogging;
+ @Mock
+ private ObservabilityConfig mockConfig;
@Test
@SuppressWarnings("unchecked")
public void verifyWrite() throws Exception {
+ when(mockConfig.getCustomTags()).thenReturn(CUSTOM_TAGS);
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
- CUSTOM_TAGS, Collections.emptySet());
- sink.write(LOG_PROTO);
+ mockConfig, Collections.emptySet(), new TraceLoggingHelper(DEST_PROJECT_NAME));
+ sink.write(LOG_PROTO, null);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
@@ -127,10 +137,11 @@ public class GcpLogSinkTest {
@Test
@SuppressWarnings("unchecked")
public void verifyWriteWithTags() {
+ when(mockConfig.getCustomTags()).thenReturn(CUSTOM_TAGS);
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
- CUSTOM_TAGS, Collections.emptySet());
+ mockConfig, Collections.emptySet(), new TraceLoggingHelper(DEST_PROJECT_NAME));
MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(LOCATION_TAGS);
- sink.write(LOG_PROTO);
+ sink.write(LOG_PROTO, null);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
@@ -150,10 +161,11 @@ public class GcpLogSinkTest {
@SuppressWarnings("unchecked")
public void emptyCustomTags_labelsNotSet() {
Map<String, String> emptyCustomTags = null;
+ when(mockConfig.getCustomTags()).thenReturn(emptyCustomTags);
Map<String, String> expectedEmptyLabels = new HashMap<>();
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
- emptyCustomTags, Collections.emptySet());
- sink.write(LOG_PROTO);
+ mockConfig, Collections.emptySet(), new TraceLoggingHelper(DEST_PROJECT_NAME));
+ sink.write(LOG_PROTO, null);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
@@ -169,12 +181,13 @@ public class GcpLogSinkTest {
@SuppressWarnings("unchecked")
public void emptyCustomTags_setSourceProject() {
Map<String, String> emptyCustomTags = null;
+ when(mockConfig.getCustomTags()).thenReturn(emptyCustomTags);
String projectId = "PROJECT";
Map<String, String> expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, LOCATION_TAGS,
projectId);
GcpLogSink sink = new GcpLogSink(mockLogging, projectId, LOCATION_TAGS,
- emptyCustomTags, Collections.emptySet());
- sink.write(LOG_PROTO);
+ mockConfig, Collections.emptySet(), new TraceLoggingHelper(DEST_PROJECT_NAME));
+ sink.write(LOG_PROTO, null);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
@@ -189,8 +202,8 @@ public class GcpLogSinkTest {
@Test
public void verifyClose() throws Exception {
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
- CUSTOM_TAGS, Collections.emptySet());
- sink.write(LOG_PROTO);
+ mockConfig, Collections.emptySet(), new TraceLoggingHelper(DEST_PROJECT_NAME));
+ sink.write(LOG_PROTO, null);
verify(mockLogging, times(1)).write(anyIterable());
sink.close();
verify(mockLogging).close();
@@ -200,8 +213,106 @@ public class GcpLogSinkTest {
@Test
public void verifyExclude() throws Exception {
Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
- CUSTOM_TAGS, Collections.singleton("service"));
- mockSink.write(LOG_PROTO);
+ mockConfig, Collections.singleton("service"), new TraceLoggingHelper(DEST_PROJECT_NAME));
+ mockSink.write(LOG_PROTO, null);
verifyNoInteractions(mockLogging);
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void verifyNoTraceDataInLogs_withTraceDisabled() throws Exception {
+ SpanContext validSpanContext = SpanContext.create(
+ TraceId.fromLowerBase16("4c6af40c499951eb7de2777ba1e4fefa"),
+ SpanId.fromLowerBase16("de52e84d13dd232d"),
+ TraceOptions.builder().setIsSampled(true).build(),
+ Tracestate.builder().build());
+
+ TraceLoggingHelper traceLoggingHelper = new TraceLoggingHelper(DEST_PROJECT_NAME);
+ when(mockConfig.isEnableCloudTracing()).thenReturn(false);
+ Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
+ mockConfig, Collections.emptySet(), traceLoggingHelper);
+ mockSink.write(LOG_PROTO, validSpanContext);
+
+ ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
+ (Class) Collection.class);
+ verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
+ for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
+ LogEntry entry = it.next();
+ assertThat(entry.getTrace()).isNull(); // Field not present
+ assertThat(entry.getSpanId()).isNull(); // Field not present
+ assertThat(entry.getTraceSampled()).isFalse(); // Default value
+ assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO);
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void verifyTraceDataInLogs_withValidSpanContext() throws Exception {
+ CharSequence traceIdSeq = "4c6af40c499951eb7de2777ba1e4fefa";
+ CharSequence spanIdSeq = "de52e84d13dd232d";
+ TraceId traceId = TraceId.fromLowerBase16(traceIdSeq);
+ SpanId spanId = SpanId.fromLowerBase16(spanIdSeq);
+ boolean traceSampled = true;
+
+ SpanContext validSpanContext = SpanContext.create(traceId, spanId,
+ TraceOptions.builder().setIsSampled(traceSampled).build(),
+ Tracestate.builder().build());
+
+ TraceLoggingHelper traceLoggingHelper = new TraceLoggingHelper(DEST_PROJECT_NAME);
+ when(mockConfig.isEnableCloudTracing()).thenReturn(true);
+ Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
+ mockConfig, Collections.emptySet(), traceLoggingHelper);
+ mockSink.write(LOG_PROTO, validSpanContext);
+
+ String expectedTrace = "projects/" + DEST_PROJECT_NAME + "/traces/" + traceIdSeq;
+
+ ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
+ (Class) Collection.class);
+ verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
+ for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
+ LogEntry entry = it.next();
+ assertThat(entry.getTrace()).isEqualTo(expectedTrace);
+ assertThat(entry.getSpanId()).isEqualTo("" + spanIdSeq);
+ assertThat(entry.getTraceSampled()).isEqualTo(traceSampled);
+ assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO);
+ }
+
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void verifyTraceDataLogs_withNullSpanContext() throws Exception {
+ TraceLoggingHelper traceLoggingHelper = new TraceLoggingHelper(DEST_PROJECT_NAME);
+ when(mockConfig.isEnableCloudTracing()).thenReturn(true);
+ Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
+ mockConfig, Collections.emptySet(), traceLoggingHelper);
+
+ String expectedTrace =
+ "projects/" + DEST_PROJECT_NAME + "/traces/00000000000000000000000000000000";
+ String expectedSpanId = "0000000000000000";
+
+ ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
+ (Class) Collection.class);
+
+ // Client log with default span context
+ mockSink.write(LOG_PROTO , SpanContext.INVALID);
+ verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
+ for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
+ LogEntry entry = it.next();
+ assertThat(entry.getTrace()).isEqualTo(expectedTrace);
+ assertThat(entry.getSpanId()).isEqualTo(expectedSpanId);
+ assertThat(entry.getTraceSampled()).isFalse();
+ }
+
+ // Server log
+ GrpcLogRecord serverLogProto = LOG_PROTO.toBuilder().setLogger(EventLogger.SERVER).build();
+ mockSink.write(serverLogProto , SpanContext.INVALID);
+ verify(mockLogging, times(2)).write(logEntrySetCaptor.capture());
+ for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
+ LogEntry entry = it.next();
+ assertThat(entry.getTrace()).isEqualTo(expectedTrace);
+ assertThat(entry.getSpanId()).isEqualTo(expectedSpanId);
+ assertThat(entry.getTraceSampled()).isFalse();
+ }
+ }
}
diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/TraceLoggingHelperTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/TraceLoggingHelperTest.java
new file mode 100644
index 000000000..893cf478c
--- /dev/null
+++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/TraceLoggingHelperTest.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2023 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.gcp.observability.logging;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.cloud.logging.LogEntry;
+import io.opencensus.trace.SpanContext;
+import io.opencensus.trace.SpanId;
+import io.opencensus.trace.TraceId;
+import io.opencensus.trace.TraceOptions;
+import io.opencensus.trace.Tracestate;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link TraceLoggingHelper}.
+ */
+@RunWith(JUnit4.class)
+public class TraceLoggingHelperTest {
+
+ private static final String PROJECT = "PROJECT";
+ private static final Tracestate EMPTY_TRACESTATE = Tracestate.builder().build();
+ private static TraceLoggingHelper traceLoggingHelper;
+
+ @Before
+ public void setUp() {
+ traceLoggingHelper = new TraceLoggingHelper(PROJECT);
+ }
+
+ @Test
+ public void enhanceLogEntry_AddSampledSpanContextToLogEntry() {
+ SpanContext spanContext = SpanContext.create(
+ TraceId.fromLowerBase16("5ce724c382c136b2a67bb447e6a6bd27"),
+ SpanId.fromLowerBase16("de52e84d13dd232d"),
+ TraceOptions.builder().setIsSampled(true).build(),
+ EMPTY_TRACESTATE);
+ LogEntry logEntry = getEnhancedLogEntry(traceLoggingHelper, spanContext);
+ assertThat(logEntry.getTraceSampled()).isTrue();
+ assertThat(logEntry.getTrace())
+ .isEqualTo("projects/PROJECT/traces/5ce724c382c136b2a67bb447e6a6bd27");
+ assertThat(logEntry.getSpanId()).isEqualTo("de52e84d13dd232d");
+ }
+
+ @Test
+ public void enhanceLogEntry_AddNonSampledSpanContextToLogEntry() {
+ SpanContext spanContext = SpanContext.create(
+ TraceId.fromLowerBase16("649a8a64db2d0c757fd06bb1bfe84e2c"),
+ SpanId.fromLowerBase16("731e102335b7a5a0"),
+ TraceOptions.builder().setIsSampled(false).build(),
+ EMPTY_TRACESTATE);
+ LogEntry logEntry = getEnhancedLogEntry(traceLoggingHelper, spanContext);
+ assertThat(logEntry.getTraceSampled()).isFalse();
+ assertThat(logEntry.getTrace())
+ .isEqualTo("projects/PROJECT/traces/649a8a64db2d0c757fd06bb1bfe84e2c");
+ assertThat(logEntry.getSpanId()).isEqualTo("731e102335b7a5a0");
+ }
+
+ @Test
+ public void enhanceLogEntry_AddBlankSpanContextToLogEntry() {
+ SpanContext spanContext = SpanContext.INVALID;
+ LogEntry logEntry = getEnhancedLogEntry(traceLoggingHelper, spanContext);
+ assertThat(logEntry.getTraceSampled()).isFalse();
+ assertThat(logEntry.getTrace())
+ .isEqualTo("projects/PROJECT/traces/00000000000000000000000000000000");
+ assertThat(logEntry.getSpanId()).isEqualTo("0000000000000000");
+ }
+
+ private static LogEntry getEnhancedLogEntry(TraceLoggingHelper traceLoggingHelper,
+ SpanContext spanContext) {
+ LogEntry.Builder logEntryBuilder = LogEntry.newBuilder(null);
+ traceLoggingHelper.enhanceLogEntry(logEntryBuilder, spanContext);
+ return logEntryBuilder.build();
+ }
+}