diff options
author | zpencer <spencerfang@google.com> | 2018-05-20 17:23:48 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-05-20 17:23:48 -0700 |
commit | 30478d88c722a5a25e74f80e43578cb265d0ccfb (patch) | |
tree | 06e0d7e2f70774fcb28de2c30ce4f871e506e865 /services | |
parent | faffb09f0a0398c92b99bff6e3e59ca7a8121a5e (diff) | |
download | grpc-grpc-java-30478d88c722a5a25e74f80e43578cb265d0ccfb.tar.gz |
services: add temp file based binary log sink (#4404)
No need to use service provider for BinaryLogSink, it can just be an
interface that is passed into BinaryLogProviderImpl.
Add a default TempFileSink that uses the protobuf object's
writeDelimited method to write to the output stream.
Warning: TempFileSink blocks.
Diffstat (limited to 'services')
9 files changed, 218 insertions, 87 deletions
diff --git a/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java index e66706a6b..907295ea3 100644 --- a/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java +++ b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java @@ -16,12 +16,13 @@ package io.grpc.services; +import com.google.common.base.Preconditions; import io.grpc.BinaryLogProvider; import io.grpc.CallOptions; import io.grpc.ClientInterceptor; import io.grpc.ServerInterceptor; +import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -31,22 +32,29 @@ import javax.annotation.Nullable; class BinaryLogProviderImpl extends BinaryLogProvider { private static final Logger logger = Logger.getLogger(BinaryLogProviderImpl.class.getName()); private final BinlogHelper.Factory factory; + private final BinaryLogSink sink; private final AtomicLong counter = new AtomicLong(); - public BinaryLogProviderImpl() { - this(BinaryLogSinkProvider.provider(), System.getenv("GRPC_BINARY_LOG_CONFIG")); + public BinaryLogProviderImpl() throws IOException { + this(new TempFileSink(), System.getenv("GRPC_BINARY_LOG_CONFIG")); } - BinaryLogProviderImpl(BinaryLogSink sink, String configStr) { - BinlogHelper.Factory factory = null; + /** + * Creates an instance. + * @param sink ownership is transferred to this class. + * @param configStr config string to parse to determine logged methods and msg size limits. + * @throws IOException if initialization failed. + */ + BinaryLogProviderImpl(BinaryLogSink sink, String configStr) throws IOException { + this.sink = Preconditions.checkNotNull(sink); try { factory = new BinlogHelper.FactoryImpl(sink, configStr); } catch (RuntimeException e) { - logger.log(Level.SEVERE, "Caught exception, binary log will be disabled", e); - } catch (Error err) { - logger.log(Level.SEVERE, "Caught exception, binary log will be disabled", err); + sink.close(); + // parsing the conf string may throw if it is blank or contains errors + throw new IOException( + "Can not initialize. The env variable GRPC_BINARY_LOG_CONFIG must be valid.", e); } - this.factory = factory; } @Nullable @@ -70,6 +78,11 @@ class BinaryLogProviderImpl extends BinaryLogProvider { return helperForMethod.getClientInterceptor(getClientCallId(callOptions)); } + @Override + public void close() throws IOException { + sink.close(); + } + protected CallId getServerCallId() { return new CallId(0, counter.getAndIncrement()); } diff --git a/services/src/main/java/io/grpc/services/BinaryLogSink.java b/services/src/main/java/io/grpc/services/BinaryLogSink.java index 009573e49..ef5be01da 100644 --- a/services/src/main/java/io/grpc/services/BinaryLogSink.java +++ b/services/src/main/java/io/grpc/services/BinaryLogSink.java @@ -19,23 +19,9 @@ package io.grpc.services; import com.google.protobuf.MessageLite; import java.io.Closeable; -abstract class BinaryLogSink implements Closeable { +interface BinaryLogSink extends Closeable { /** * Writes the {@code message} to the destination. */ - public abstract void write(MessageLite message); - - /** - * Whether this provider is available for use, taking the current environment into consideration. - * If {@code false}, no other methods are safe to be called. - */ - protected abstract boolean isAvailable(); - - /** - * A priority, from 0 to 10 that this provider should be used, taking the current environment into - * consideration. 5 should be considered the default, and then tweaked based on environment - * detection. A priority of 0 does not imply that the provider wouldn't work; just that it should - * be last in line. - */ - protected abstract int priority(); + void write(MessageLite message); } diff --git a/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java b/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java deleted file mode 100644 index 84ef86890..000000000 --- a/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2017 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.services; - -import io.grpc.InternalServiceProviders; -import java.util.Collections; -import javax.annotation.Nullable; -import javax.annotation.concurrent.ThreadSafe; - -/** - * Subclasses must be thread safe, and are responsible for writing the binary log message to - * the appropriate destination. - */ -@ThreadSafe -final class BinaryLogSinkProvider { - private static final BinaryLogSink INSTANCE = InternalServiceProviders.load( - BinaryLogSink.class, - Collections.<Class<?>>emptyList(), - BinaryLogSinkProvider.class.getClassLoader(), - new InternalServiceProviders.PriorityAccessor<BinaryLogSink>() { - @Override - public boolean isAvailable(BinaryLogSink provider) { - return provider.isAvailable(); - } - - @Override - public int getPriority(BinaryLogSink provider) { - return provider.priority(); - } - }); - - /** - * Returns the {@code BinaryLogSink} that should be used. - */ - @Nullable - static BinaryLogSink provider() { - return INSTANCE; - } -} diff --git a/services/src/main/java/io/grpc/services/BinaryLogs.java b/services/src/main/java/io/grpc/services/BinaryLogs.java index 7f000cb80..61b83bcab 100644 --- a/services/src/main/java/io/grpc/services/BinaryLogs.java +++ b/services/src/main/java/io/grpc/services/BinaryLogs.java @@ -19,14 +19,15 @@ package io.grpc.services; import io.grpc.BinaryLog; import io.grpc.ExperimentalApi; import io.grpc.InternalBinaryLogs; +import java.io.IOException; @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4017") public final class BinaryLogs { - public static BinaryLog createBinaryLog() { + public static BinaryLog createBinaryLog() throws IOException { return InternalBinaryLogs.createBinaryLog(new BinaryLogProviderImpl()); } - public static BinaryLog createCensusBinaryLog() { + public static BinaryLog createCensusBinaryLog() throws IOException { return InternalBinaryLogs.createBinaryLog(new CensusBinaryLogProvider()); } diff --git a/services/src/main/java/io/grpc/services/CensusBinaryLogProvider.java b/services/src/main/java/io/grpc/services/CensusBinaryLogProvider.java index 3bfe4c5d0..f255d67e8 100644 --- a/services/src/main/java/io/grpc/services/CensusBinaryLogProvider.java +++ b/services/src/main/java/io/grpc/services/CensusBinaryLogProvider.java @@ -20,9 +20,19 @@ import io.grpc.BinaryLogProvider; import io.grpc.CallOptions; import io.opencensus.trace.Span; import io.opencensus.trace.Tracing; +import java.io.IOException; import java.nio.ByteBuffer; final class CensusBinaryLogProvider extends BinaryLogProviderImpl { + + public CensusBinaryLogProvider() throws IOException { + super(); + } + + CensusBinaryLogProvider(BinaryLogSink sink, String configStr) throws IOException { + super(sink, configStr); + } + @Override protected CallId getServerCallId() { Span currentSpan = Tracing.getTracer().getCurrentSpan(); diff --git a/services/src/main/java/io/grpc/services/TempFileSink.java b/services/src/main/java/io/grpc/services/TempFileSink.java new file mode 100644 index 000000000..c28339d1b --- /dev/null +++ b/services/src/main/java/io/grpc/services/TempFileSink.java @@ -0,0 +1,84 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.protobuf.MessageLite; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The output file goes to the JVM's temp dir with a prefix of BINARY_INFO. The proto messages + * are written serially using {@link MessageLite#writeDelimitedTo(OutputStream)}. + */ +class TempFileSink implements BinaryLogSink { + private static final Logger logger = Logger.getLogger(TempFileSink.class.getName()); + + private final String outPath; + private final OutputStream out; + private boolean closed; + + TempFileSink() throws IOException { + File outFile = File.createTempFile("BINARY_INFO.", ""); + outPath = outFile.getPath(); + logger.log(Level.INFO, "Writing binary logs to to {0}", outFile.getAbsolutePath()); + out = new BufferedOutputStream(new FileOutputStream(outFile)); + } + + String getPath() { + return this.outPath; + } + + @Override + public synchronized void write(MessageLite message) { + if (closed) { + logger.log(Level.FINEST, "Attempt to write after TempFileSink is closed."); + return; + } + try { + message.writeDelimitedTo(out); + } catch (IOException e) { + logger.log(Level.SEVERE, "Caught exception while writing", e); + closeQuietly(); + } + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; + try { + out.flush(); + } finally { + out.close(); + } + } + + private synchronized void closeQuietly() { + try { + close(); + } catch (IOException e) { + logger.log(Level.SEVERE, "Caught exception while closing", e); + } + } +} diff --git a/services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java b/services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java index 7dbd58504..238f5fbc0 100644 --- a/services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java +++ b/services/src/test/java/io/grpc/services/BinaryLogProviderImplTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018, gRPC Authors All rights reserved. + * Copyright 2018 The gRPC Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ package io.grpc.services; import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import io.grpc.CallOptions; import org.junit.Test; @@ -42,4 +44,13 @@ public class BinaryLogProviderImplTest { assertNull(binlog.getServerInterceptor("package.service/method")); assertNull(binlog.getClientInterceptor("package.service/method", CallOptions.DEFAULT)); } + + @Test + public void closeTest() throws Exception { + BinaryLogSink sink = mock(BinaryLogSink.class); + BinaryLogProviderImpl log = new BinaryLogProviderImpl(sink, "*"); + verify(sink, never()).close(); + log.close(); + verify(sink).close(); + } } diff --git a/services/src/test/java/io/grpc/services/CensusBinaryLogProviderTest.java b/services/src/test/java/io/grpc/services/CensusBinaryLogProviderTest.java index 09d5b5903..f2e9621ca 100644 --- a/services/src/test/java/io/grpc/services/CensusBinaryLogProviderTest.java +++ b/services/src/test/java/io/grpc/services/CensusBinaryLogProviderTest.java @@ -24,29 +24,39 @@ import io.grpc.BinaryLogProvider.CallId; import io.grpc.CallOptions; import io.grpc.Context; import io.grpc.internal.testing.StatsTestUtils.MockableSpan; -import io.grpc.services.CensusBinaryLogProvider; import java.nio.ByteBuffer; import java.util.Random; +import java.util.concurrent.Callable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; /** * Tests for {@link CensusBinaryLogProvider}. */ @RunWith(JUnit4.class) public class CensusBinaryLogProviderTest { + @Mock + private BinaryLogSink sink; + + public CensusBinaryLogProviderTest() { + MockitoAnnotations.initMocks(this); + } + @Test - public void serverCallIdFromCensus() { + public void serverCallIdFromCensus() throws Exception { final MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0)); Context context = Context.current().withValue(CONTEXT_SPAN_KEY, mockableSpan); - context.run(new Runnable() { + context.call(new Callable<Void>() { @Override - public void run() { - CallId callId = new CensusBinaryLogProvider().getServerCallId(); + public Void call() throws Exception { + CallId callId = new CensusBinaryLogProvider(sink, "*").getServerCallId(); assertThat(callId.hi).isEqualTo(0); assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong()) .isEqualTo(callId.lo); + return null; } }); } @@ -54,7 +64,7 @@ public class CensusBinaryLogProviderTest { @Test public void clientCallId() throws Exception { CallId expected = new CallId(1234, 5677); - CallId actual = new CensusBinaryLogProvider() + CallId actual = new CensusBinaryLogProvider(sink, "*") .getClientCallId( CallOptions.DEFAULT.withOption( BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, diff --git a/services/src/test/java/io/grpc/services/TempFileSinkTest.java b/services/src/test/java/io/grpc/services/TempFileSinkTest.java new file mode 100644 index 000000000..05ae4887e --- /dev/null +++ b/services/src/test/java/io/grpc/services/TempFileSinkTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static org.junit.Assert.assertEquals; + +import io.grpc.binarylog.GrpcLogEntry; +import io.grpc.binarylog.Uint128; +import java.io.DataInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link io.grpc.services.TempFileSink}. + */ +@RunWith(JUnit4.class) +public class TempFileSinkTest { + @Test + public void readMyWrite() throws Exception { + TempFileSink sink = new TempFileSink(); + GrpcLogEntry e1 = GrpcLogEntry.newBuilder() + .setCallId(Uint128.newBuilder().setLow(1234)) + .build(); + GrpcLogEntry e2 = GrpcLogEntry.newBuilder() + .setCallId(Uint128.newBuilder().setLow(5678)) + .build(); + sink.write(e1); + sink.write(e2); + sink.close(); + + DataInputStream input = new DataInputStream(new FileInputStream(sink.getPath())); + try { + GrpcLogEntry read1 = GrpcLogEntry.parseDelimitedFrom(input); + GrpcLogEntry read2 = GrpcLogEntry.parseDelimitedFrom(input); + + assertEquals(e1, read1); + assertEquals(e2, read2); + assertEquals(-1, input.read()); + } finally { + input.close(); + } + } + + @Test + public void writeAfterCloseIsSilent() throws IOException { + TempFileSink sink = new TempFileSink(); + sink.close(); + sink.write(GrpcLogEntry.newBuilder() + .setCallId(Uint128.newBuilder().setLow(1234)) + .build()); + } +} |