aboutsummaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
authorzpencer <spencerfang@google.com>2018-05-20 17:23:48 -0700
committerGitHub <noreply@github.com>2018-05-20 17:23:48 -0700
commit30478d88c722a5a25e74f80e43578cb265d0ccfb (patch)
tree06e0d7e2f70774fcb28de2c30ce4f871e506e865 /services
parentfaffb09f0a0398c92b99bff6e3e59ca7a8121a5e (diff)
downloadgrpc-grpc-java-30478d88c722a5a25e74f80e43578cb265d0ccfb.tar.gz
services: add temp file based binary log sink (#4404)
No need to use service provider for BinaryLogSink, it can just be an interface that is passed into BinaryLogProviderImpl. Add a default TempFileSink that uses the protobuf object's writeDelimited method to write to the output stream. Warning: TempFileSink blocks.
Diffstat (limited to 'services')
-rw-r--r--services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java31
-rw-r--r--services/src/main/java/io/grpc/services/BinaryLogSink.java18
-rw-r--r--services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java53
-rw-r--r--services/src/main/java/io/grpc/services/BinaryLogs.java5
-rw-r--r--services/src/main/java/io/grpc/services/CensusBinaryLogProvider.java10
-rw-r--r--services/src/main/java/io/grpc/services/TempFileSink.java84
-rw-r--r--services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java13
-rw-r--r--services/src/test/java/io/grpc/services/CensusBinaryLogProviderTest.java22
-rw-r--r--services/src/test/java/io/grpc/services/TempFileSinkTest.java69
9 files changed, 218 insertions, 87 deletions
diff --git a/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java
index e66706a6b..907295ea3 100644
--- a/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java
+++ b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java
@@ -16,12 +16,13 @@
package io.grpc.services;
+import com.google.common.base.Preconditions;
import io.grpc.BinaryLogProvider;
import io.grpc.CallOptions;
import io.grpc.ClientInterceptor;
import io.grpc.ServerInterceptor;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -31,22 +32,29 @@ import javax.annotation.Nullable;
class BinaryLogProviderImpl extends BinaryLogProvider {
private static final Logger logger = Logger.getLogger(BinaryLogProviderImpl.class.getName());
private final BinlogHelper.Factory factory;
+ private final BinaryLogSink sink;
private final AtomicLong counter = new AtomicLong();
- public BinaryLogProviderImpl() {
- this(BinaryLogSinkProvider.provider(), System.getenv("GRPC_BINARY_LOG_CONFIG"));
+ public BinaryLogProviderImpl() throws IOException {
+ this(new TempFileSink(), System.getenv("GRPC_BINARY_LOG_CONFIG"));
}
- BinaryLogProviderImpl(BinaryLogSink sink, String configStr) {
- BinlogHelper.Factory factory = null;
+ /**
+ * Creates an instance.
+ * @param sink ownership is transferred to this class.
+ * @param configStr config string to parse to determine logged methods and msg size limits.
+ * @throws IOException if initialization failed.
+ */
+ BinaryLogProviderImpl(BinaryLogSink sink, String configStr) throws IOException {
+ this.sink = Preconditions.checkNotNull(sink);
try {
factory = new BinlogHelper.FactoryImpl(sink, configStr);
} catch (RuntimeException e) {
- logger.log(Level.SEVERE, "Caught exception, binary log will be disabled", e);
- } catch (Error err) {
- logger.log(Level.SEVERE, "Caught exception, binary log will be disabled", err);
+ sink.close();
+ // parsing the conf string may throw if it is blank or contains errors
+ throw new IOException(
+ "Can not initialize. The env variable GRPC_BINARY_LOG_CONFIG must be valid.", e);
}
- this.factory = factory;
}
@Nullable
@@ -70,6 +78,11 @@ class BinaryLogProviderImpl extends BinaryLogProvider {
return helperForMethod.getClientInterceptor(getClientCallId(callOptions));
}
+ @Override
+ public void close() throws IOException {
+ sink.close();
+ }
+
protected CallId getServerCallId() {
return new CallId(0, counter.getAndIncrement());
}
diff --git a/services/src/main/java/io/grpc/services/BinaryLogSink.java b/services/src/main/java/io/grpc/services/BinaryLogSink.java
index 009573e49..ef5be01da 100644
--- a/services/src/main/java/io/grpc/services/BinaryLogSink.java
+++ b/services/src/main/java/io/grpc/services/BinaryLogSink.java
@@ -19,23 +19,9 @@ package io.grpc.services;
import com.google.protobuf.MessageLite;
import java.io.Closeable;
-abstract class BinaryLogSink implements Closeable {
+interface BinaryLogSink extends Closeable {
/**
* Writes the {@code message} to the destination.
*/
- public abstract void write(MessageLite message);
-
- /**
- * Whether this provider is available for use, taking the current environment into consideration.
- * If {@code false}, no other methods are safe to be called.
- */
- protected abstract boolean isAvailable();
-
- /**
- * A priority, from 0 to 10 that this provider should be used, taking the current environment into
- * consideration. 5 should be considered the default, and then tweaked based on environment
- * detection. A priority of 0 does not imply that the provider wouldn't work; just that it should
- * be last in line.
- */
- protected abstract int priority();
+ void write(MessageLite message);
}
diff --git a/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java b/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java
deleted file mode 100644
index 84ef86890..000000000
--- a/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2017 The gRPC Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.grpc.services;
-
-import io.grpc.InternalServiceProviders;
-import java.util.Collections;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Subclasses must be thread safe, and are responsible for writing the binary log message to
- * the appropriate destination.
- */
-@ThreadSafe
-final class BinaryLogSinkProvider {
- private static final BinaryLogSink INSTANCE = InternalServiceProviders.load(
- BinaryLogSink.class,
- Collections.<Class<?>>emptyList(),
- BinaryLogSinkProvider.class.getClassLoader(),
- new InternalServiceProviders.PriorityAccessor<BinaryLogSink>() {
- @Override
- public boolean isAvailable(BinaryLogSink provider) {
- return provider.isAvailable();
- }
-
- @Override
- public int getPriority(BinaryLogSink provider) {
- return provider.priority();
- }
- });
-
- /**
- * Returns the {@code BinaryLogSink} that should be used.
- */
- @Nullable
- static BinaryLogSink provider() {
- return INSTANCE;
- }
-}
diff --git a/services/src/main/java/io/grpc/services/BinaryLogs.java b/services/src/main/java/io/grpc/services/BinaryLogs.java
index 7f000cb80..61b83bcab 100644
--- a/services/src/main/java/io/grpc/services/BinaryLogs.java
+++ b/services/src/main/java/io/grpc/services/BinaryLogs.java
@@ -19,14 +19,15 @@ package io.grpc.services;
import io.grpc.BinaryLog;
import io.grpc.ExperimentalApi;
import io.grpc.InternalBinaryLogs;
+import java.io.IOException;
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4017")
public final class BinaryLogs {
- public static BinaryLog createBinaryLog() {
+ public static BinaryLog createBinaryLog() throws IOException {
return InternalBinaryLogs.createBinaryLog(new BinaryLogProviderImpl());
}
- public static BinaryLog createCensusBinaryLog() {
+ public static BinaryLog createCensusBinaryLog() throws IOException {
return InternalBinaryLogs.createBinaryLog(new CensusBinaryLogProvider());
}
diff --git a/services/src/main/java/io/grpc/services/CensusBinaryLogProvider.java b/services/src/main/java/io/grpc/services/CensusBinaryLogProvider.java
index 3bfe4c5d0..f255d67e8 100644
--- a/services/src/main/java/io/grpc/services/CensusBinaryLogProvider.java
+++ b/services/src/main/java/io/grpc/services/CensusBinaryLogProvider.java
@@ -20,9 +20,19 @@ import io.grpc.BinaryLogProvider;
import io.grpc.CallOptions;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracing;
+import java.io.IOException;
import java.nio.ByteBuffer;
final class CensusBinaryLogProvider extends BinaryLogProviderImpl {
+
+ public CensusBinaryLogProvider() throws IOException {
+ super();
+ }
+
+ CensusBinaryLogProvider(BinaryLogSink sink, String configStr) throws IOException {
+ super(sink, configStr);
+ }
+
@Override
protected CallId getServerCallId() {
Span currentSpan = Tracing.getTracer().getCurrentSpan();
diff --git a/services/src/main/java/io/grpc/services/TempFileSink.java b/services/src/main/java/io/grpc/services/TempFileSink.java
new file mode 100644
index 000000000..c28339d1b
--- /dev/null
+++ b/services/src/main/java/io/grpc/services/TempFileSink.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import com.google.protobuf.MessageLite;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The output file goes to the JVM's temp dir with a prefix of BINARY_INFO. The proto messages
+ * are written serially using {@link MessageLite#writeDelimitedTo(OutputStream)}.
+ */
+class TempFileSink implements BinaryLogSink {
+ private static final Logger logger = Logger.getLogger(TempFileSink.class.getName());
+
+ private final String outPath;
+ private final OutputStream out;
+ private boolean closed;
+
+ TempFileSink() throws IOException {
+ File outFile = File.createTempFile("BINARY_INFO.", "");
+ outPath = outFile.getPath();
+ logger.log(Level.INFO, "Writing binary logs to to {0}", outFile.getAbsolutePath());
+ out = new BufferedOutputStream(new FileOutputStream(outFile));
+ }
+
+ String getPath() {
+ return this.outPath;
+ }
+
+ @Override
+ public synchronized void write(MessageLite message) {
+ if (closed) {
+ logger.log(Level.FINEST, "Attempt to write after TempFileSink is closed.");
+ return;
+ }
+ try {
+ message.writeDelimitedTo(out);
+ } catch (IOException e) {
+ logger.log(Level.SEVERE, "Caught exception while writing", e);
+ closeQuietly();
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ out.flush();
+ } finally {
+ out.close();
+ }
+ }
+
+ private synchronized void closeQuietly() {
+ try {
+ close();
+ } catch (IOException e) {
+ logger.log(Level.SEVERE, "Caught exception while closing", e);
+ }
+ }
+}
diff --git a/services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java b/services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java
index 7dbd58504..238f5fbc0 100644
--- a/services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java
+++ b/services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2018, gRPC Authors All rights reserved.
+ * Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@ package io.grpc.services;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import io.grpc.CallOptions;
import org.junit.Test;
@@ -42,4 +44,13 @@ public class BinaryLogProviderImplTest {
assertNull(binlog.getServerInterceptor("package.service/method"));
assertNull(binlog.getClientInterceptor("package.service/method", CallOptions.DEFAULT));
}
+
+ @Test
+ public void closeTest() throws Exception {
+ BinaryLogSink sink = mock(BinaryLogSink.class);
+ BinaryLogProviderImpl log = new BinaryLogProviderImpl(sink, "*");
+ verify(sink, never()).close();
+ log.close();
+ verify(sink).close();
+ }
}
diff --git a/services/src/test/java/io/grpc/services/CensusBinaryLogProviderTest.java b/services/src/test/java/io/grpc/services/CensusBinaryLogProviderTest.java
index 09d5b5903..f2e9621ca 100644
--- a/services/src/test/java/io/grpc/services/CensusBinaryLogProviderTest.java
+++ b/services/src/test/java/io/grpc/services/CensusBinaryLogProviderTest.java
@@ -24,29 +24,39 @@ import io.grpc.BinaryLogProvider.CallId;
import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.internal.testing.StatsTestUtils.MockableSpan;
-import io.grpc.services.CensusBinaryLogProvider;
import java.nio.ByteBuffer;
import java.util.Random;
+import java.util.concurrent.Callable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
/**
* Tests for {@link CensusBinaryLogProvider}.
*/
@RunWith(JUnit4.class)
public class CensusBinaryLogProviderTest {
+ @Mock
+ private BinaryLogSink sink;
+
+ public CensusBinaryLogProviderTest() {
+ MockitoAnnotations.initMocks(this);
+ }
+
@Test
- public void serverCallIdFromCensus() {
+ public void serverCallIdFromCensus() throws Exception {
final MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
Context context = Context.current().withValue(CONTEXT_SPAN_KEY, mockableSpan);
- context.run(new Runnable() {
+ context.call(new Callable<Void>() {
@Override
- public void run() {
- CallId callId = new CensusBinaryLogProvider().getServerCallId();
+ public Void call() throws Exception {
+ CallId callId = new CensusBinaryLogProvider(sink, "*").getServerCallId();
assertThat(callId.hi).isEqualTo(0);
assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong())
.isEqualTo(callId.lo);
+ return null;
}
});
}
@@ -54,7 +64,7 @@ public class CensusBinaryLogProviderTest {
@Test
public void clientCallId() throws Exception {
CallId expected = new CallId(1234, 5677);
- CallId actual = new CensusBinaryLogProvider()
+ CallId actual = new CensusBinaryLogProvider(sink, "*")
.getClientCallId(
CallOptions.DEFAULT.withOption(
BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY,
diff --git a/services/src/test/java/io/grpc/services/TempFileSinkTest.java b/services/src/test/java/io/grpc/services/TempFileSinkTest.java
new file mode 100644
index 000000000..05ae4887e
--- /dev/null
+++ b/services/src/test/java/io/grpc/services/TempFileSinkTest.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import static org.junit.Assert.assertEquals;
+
+import io.grpc.binarylog.GrpcLogEntry;
+import io.grpc.binarylog.Uint128;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link io.grpc.services.TempFileSink}.
+ */
+@RunWith(JUnit4.class)
+public class TempFileSinkTest {
+ @Test
+ public void readMyWrite() throws Exception {
+ TempFileSink sink = new TempFileSink();
+ GrpcLogEntry e1 = GrpcLogEntry.newBuilder()
+ .setCallId(Uint128.newBuilder().setLow(1234))
+ .build();
+ GrpcLogEntry e2 = GrpcLogEntry.newBuilder()
+ .setCallId(Uint128.newBuilder().setLow(5678))
+ .build();
+ sink.write(e1);
+ sink.write(e2);
+ sink.close();
+
+ DataInputStream input = new DataInputStream(new FileInputStream(sink.getPath()));
+ try {
+ GrpcLogEntry read1 = GrpcLogEntry.parseDelimitedFrom(input);
+ GrpcLogEntry read2 = GrpcLogEntry.parseDelimitedFrom(input);
+
+ assertEquals(e1, read1);
+ assertEquals(e2, read2);
+ assertEquals(-1, input.read());
+ } finally {
+ input.close();
+ }
+ }
+
+ @Test
+ public void writeAfterCloseIsSilent() throws IOException {
+ TempFileSink sink = new TempFileSink();
+ sink.close();
+ sink.write(GrpcLogEntry.newBuilder()
+ .setCallId(Uint128.newBuilder().setLow(1234))
+ .build());
+ }
+}