From bde2ba2444670764ec9c549f9adde220638c5ef4 Mon Sep 17 00:00:00 2001 From: zpencer Date: Fri, 16 Feb 2018 13:47:31 -0800 Subject: services: introduce BinaryLogSinkProvider (#3917) The `BinaryLog` will write `GrpcLogEntry`s to the sink, which is intended as a pluggable interface. It is the sink's responsibility to send the binlog protos to disk, to a remote logging service, etc. --- .../src/main/java/io/grpc/services/BinaryLog.java | 134 ++++++- .../main/java/io/grpc/services/BinaryLogSink.java | 41 ++ .../io/grpc/services/BinaryLogSinkProvider.java | 53 +++ .../test/java/io/grpc/services/BinaryLogTest.java | 440 ++++++++++++++++----- 4 files changed, 552 insertions(+), 116 deletions(-) create mode 100644 services/src/main/java/io/grpc/services/BinaryLogSink.java create mode 100644 services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java (limited to 'services') diff --git a/services/src/main/java/io/grpc/services/BinaryLog.java b/services/src/main/java/io/grpc/services/BinaryLog.java index 522051bb0..6523ebf64 100644 --- a/services/src/main/java/io/grpc/services/BinaryLog.java +++ b/services/src/main/java/io/grpc/services/BinaryLog.java @@ -25,12 +25,15 @@ import com.google.protobuf.ByteString; import io.grpc.InternalMetadata; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.binarylog.GrpcLogEntry; +import io.grpc.binarylog.GrpcLogEntry.Type; import io.grpc.binarylog.Message; import io.grpc.binarylog.Metadata.Builder; import io.grpc.binarylog.MetadataEntry; import io.grpc.binarylog.Peer; import io.grpc.binarylog.Peer.PeerType; import io.grpc.binarylog.Uint128; +import java.io.InputStream; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; @@ -46,25 +49,115 @@ import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; /** * A binary log class that is configured for a specific {@link MethodDescriptor}. */ +// TODO(zpencer): this is really a per-method logger class, make the class name more clear. +@ThreadSafe final class BinaryLog { private static final Logger logger = Logger.getLogger(BinaryLog.class.getName()); private static final int IP_PORT_BYTES = 2; private static final int IP_PORT_UPPER_MASK = 0xff00; private static final int IP_PORT_LOWER_MASK = 0xff; - private final int maxHeaderBytes; - private final int maxMessageBytes; + private final BinaryLogSink sink; + @VisibleForTesting + final int maxHeaderBytes; + @VisibleForTesting + final int maxMessageBytes; @VisibleForTesting - BinaryLog(int maxHeaderBytes, int maxMessageBytes) { + BinaryLog(BinaryLogSink sink, int maxHeaderBytes, int maxMessageBytes) { + this.sink = sink; this.maxHeaderBytes = maxHeaderBytes; this.maxMessageBytes = maxMessageBytes; } + /** + * Logs the sending of initial metadata. This method logs the appropriate number of bytes + * as determined by the binary logging configuration. + */ + public void logSendInitialMetadata( + Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket) { + GrpcLogEntry entry = GrpcLogEntry + .newBuilder() + .setType(Type.SEND_INITIAL_METADATA) + .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) + .setCallId(callIdToProto(callId)) + .setPeer(socketToProto(peerSocket)) + .setMetadata(metadataToProto(metadata, maxHeaderBytes)) + .build(); + sink.write(entry); + } + + /** + * Logs the receiving of initial metadata. This method logs the appropriate number of bytes + * as determined by the binary logging configuration. + */ + public void logRecvInitialMetadata( + Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket) { + GrpcLogEntry entry = GrpcLogEntry + .newBuilder() + .setType(Type.RECV_INITIAL_METADATA) + .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) + .setCallId(callIdToProto(callId)) + .setPeer(socketToProto(peerSocket)) + .setMetadata(metadataToProto(metadata, maxHeaderBytes)) + .build(); + sink.write(entry); + } + + /** + * Logs the trailing metadata. This method logs the appropriate number of bytes + * as determined by the binary logging configuration. + */ + public void logTrailingMetadata(Metadata metadata, boolean isServer, byte[] callId) { + GrpcLogEntry entry = GrpcLogEntry + .newBuilder() + .setType(isServer ? Type.SEND_TRAILING_METADATA : Type.RECV_TRAILING_METADATA) + .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) + .setCallId(callIdToProto(callId)) + .setMetadata(metadataToProto(metadata, maxHeaderBytes)) + .build(); + sink.write(entry); + } + + /** + * Logs the outbound message. This method logs the appropriate number of bytes from + * {@code message}, and returns an {@link InputStream} that contains the original message. + * The number of bytes logged is determined by the binary logging configuration. + */ + public void logOutboundMessage( + ByteBuffer message, boolean compressed, boolean isServer, byte[] callId) { + GrpcLogEntry entry = GrpcLogEntry + .newBuilder() + .setType(Type.SEND_MESSAGE) + .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) + .setCallId(callIdToProto(callId)) + .setMessage(messageToProto(message, compressed, maxMessageBytes)) + .build(); + sink.write(entry); + } + + /** + * Logs the inbound message. This method logs the appropriate number of bytes from + * {@code message}, and returns an {@link InputStream} that contains the original message. + * The number of bytes logged is determined by the binary logging configuration. + */ + public void logInboundMessage( + ByteBuffer message, boolean compressed, boolean isServer, byte[] callId) { + GrpcLogEntry entry = GrpcLogEntry + .newBuilder() + .setType(Type.RECV_MESSAGE) + .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) + .setCallId(callIdToProto(callId)) + .setMessage(messageToProto(message, compressed, maxMessageBytes)) + .build(); + sink.write(entry); + } + @Override public boolean equals(Object o) { if (!(o instanceof BinaryLog)) { @@ -72,7 +165,8 @@ final class BinaryLog { } BinaryLog that = (BinaryLog) o; return this.maxHeaderBytes == that.maxHeaderBytes - && this.maxMessageBytes == that.maxMessageBytes; + && this.maxMessageBytes == that.maxMessageBytes + && this.sink.equals(that.sink); } @Override @@ -84,7 +178,9 @@ final class BinaryLog { public String toString() { return getClass().getSimpleName() + '[' + "maxHeaderBytes=" + maxHeaderBytes + ", " - + "maxMessageBytes=" + maxMessageBytes + "]"; + + "maxMessageBytes=" + maxMessageBytes + + "sink=" + sink + + "]"; } private static final Factory DEFAULT_FACTORY; @@ -94,8 +190,10 @@ final class BinaryLog { Factory defaultFactory = NULL_FACTORY; try { String configStr = System.getenv("GRPC_BINARY_LOG_CONFIG"); - if (configStr != null && configStr.length() > 0) { - defaultFactory = new FactoryImpl(configStr); + // TODO(zpencer): make BinaryLog.java implement isAvailable, and put this check there + BinaryLogSink sink = BinaryLogSinkProvider.provider(); + if (sink != null && configStr != null && configStr.length() > 0) { + defaultFactory = new FactoryImpl(sink, configStr); } } catch (Throwable t) { logger.log(Level.SEVERE, "Failed to initialize binary log. Disabling binary log.", t); @@ -140,7 +238,7 @@ final class BinaryLog { * Accepts a string in the format specified by the binary log spec. */ @VisibleForTesting - FactoryImpl(String configurationString) { + FactoryImpl(BinaryLogSink sink, String configurationString) { Preconditions.checkState(configurationString != null && configurationString.length() > 0); BinaryLog globalLog = null; Map perServiceLogs = new HashMap(); @@ -153,7 +251,7 @@ final class BinaryLog { } String methodOrSvc = configMatcher.group(1); String binlogOptionStr = configMatcher.group(2); - BinaryLog binLog = createBinaryLog(binlogOptionStr); + BinaryLog binLog = createBinaryLog(sink, binlogOptionStr); if (binLog == null) { continue; } @@ -215,9 +313,9 @@ final class BinaryLog { */ @VisibleForTesting @Nullable - static BinaryLog createBinaryLog(@Nullable String logConfig) { + static BinaryLog createBinaryLog(BinaryLogSink sink, @Nullable String logConfig) { if (logConfig == null) { - return new BinaryLog(Integer.MAX_VALUE, Integer.MAX_VALUE); + return new BinaryLog(sink, Integer.MAX_VALUE, Integer.MAX_VALUE); } try { Matcher headerMatcher; @@ -244,7 +342,7 @@ final class BinaryLog { logger.log(Level.SEVERE, "Illegal log config pattern: " + logConfig); return null; } - return new BinaryLog(maxHeaderBytes, maxMsgBytes); + return new BinaryLog(sink, maxHeaderBytes, maxMsgBytes); } catch (NumberFormatException e) { logger.log(Level.SEVERE, "Illegal log config pattern: " + logConfig); return null; @@ -341,15 +439,17 @@ final class BinaryLog { } @VisibleForTesting - static Message messageToProto(byte[] message, boolean compressed, int maxMessageBytes) { - Preconditions.checkState(maxMessageBytes >= 0); + static Message messageToProto(ByteBuffer message, boolean compressed, int maxMessageBytes) { + int messageSize = message.remaining(); Message.Builder builder = Message .newBuilder() .setFlags(flagsForMessage(compressed)) - .setLength(message.length); + .setLength(messageSize); if (maxMessageBytes > 0) { - int limit = Math.min(maxMessageBytes, message.length); - builder.setData(ByteString.copyFrom(message, 0, limit)); + int desiredRemaining = Math.min(maxMessageBytes, messageSize); + ByteBuffer dup = message.duplicate(); + dup.limit(dup.position() + desiredRemaining); + builder.setData(ByteString.copyFrom(dup)); } return builder.build(); } diff --git a/services/src/main/java/io/grpc/services/BinaryLogSink.java b/services/src/main/java/io/grpc/services/BinaryLogSink.java new file mode 100644 index 000000000..5e60bedc4 --- /dev/null +++ b/services/src/main/java/io/grpc/services/BinaryLogSink.java @@ -0,0 +1,41 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.protobuf.MessageLite; +import java.io.Closeable; + +public abstract class BinaryLogSink implements Closeable { + /** + * Writes the {@code message} to the destination. + */ + public abstract void write(MessageLite message); + + /** + * Whether this provider is available for use, taking the current environment into consideration. + * If {@code false}, no other methods are safe to be called. + */ + protected abstract boolean isAvailable(); + + /** + * A priority, from 0 to 10 that this provider should be used, taking the current environment into + * consideration. 5 should be considered the default, and then tweaked based on environment + * detection. A priority of 0 does not imply that the provider wouldn't work; just that it should + * be last in line. + */ + protected abstract int priority(); +} diff --git a/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java b/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java new file mode 100644 index 000000000..13aab5133 --- /dev/null +++ b/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import io.grpc.InternalServiceProviders; +import java.util.Collections; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Subclasses must be thread safe, and are responsible for writing the binary log message to + * the appropriate destination. + */ +@ThreadSafe +final class BinaryLogSinkProvider { + private static final BinaryLogSink INSTANCE = InternalServiceProviders.load( + BinaryLogSink.class, + Collections.>emptyList(), + BinaryLogSinkProvider.class.getClassLoader(), + new InternalServiceProviders.PriorityAccessor() { + @Override + public boolean isAvailable(BinaryLogSink provider) { + return provider.isAvailable(); + } + + @Override + public int getPriority(BinaryLogSink provider) { + return provider.priority(); + } + }); + + /** + * Returns the {@code BinaryLogSink} that should be used. + */ + @Nullable + static BinaryLogSink provider() { + return INSTANCE; + } +} diff --git a/services/src/test/java/io/grpc/services/BinaryLogTest.java b/services/src/test/java/io/grpc/services/BinaryLogTest.java index de95d1558..287a94f74 100644 --- a/services/src/test/java/io/grpc/services/BinaryLogTest.java +++ b/services/src/test/java/io/grpc/services/BinaryLogTest.java @@ -20,10 +20,15 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.primitives.Bytes; import com.google.protobuf.ByteString; +import com.google.protobuf.UnsafeByteOperations; import io.grpc.Metadata; +import io.grpc.binarylog.GrpcLogEntry; import io.grpc.binarylog.Message; import io.grpc.binarylog.MetadataEntry; import io.grpc.binarylog.Peer; @@ -64,7 +69,7 @@ public final class BinaryLogTest { private static final Metadata.Key KEY_C = Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER); private static final MetadataEntry ENTRY_A = - MetadataEntry + MetadataEntry .newBuilder() .setKey(ByteString.copyFrom(KEY_A.name(), US_ASCII)) .setValue(ByteString.copyFrom(DATA_A.getBytes(US_ASCII))) @@ -81,10 +86,20 @@ public final class BinaryLogTest { .setKey(ByteString.copyFrom(KEY_C.name(), US_ASCII)) .setValue(ByteString.copyFrom(DATA_C.getBytes(US_ASCII))) .build(); + private static final boolean IS_SERVER = true; + private static final boolean IS_CLIENT = false; private static final boolean IS_COMPRESSED = true; private static final boolean IS_UNCOMPRESSED = false; + private static final byte[] CALL_ID = new byte[] { + 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, + 0x19, 0x10, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f }; + private static final int HEADER_LIMIT = 10; + private static final int MESSAGE_LIMIT = Integer.MAX_VALUE; private final Metadata metadata = new Metadata(); + private final BinaryLogSink sink = mock(BinaryLogSink.class); + private final BinaryLog binaryLog = new BinaryLog(sink, HEADER_LIMIT, MESSAGE_LIMIT); + private final ByteBuffer message = ByteBuffer.wrap(new byte[100]); @Before public void setUp() throws Exception { @@ -95,169 +110,169 @@ public final class BinaryLogTest { @Test public void configBinLog_global() throws Exception { - assertEquals(BOTH_FULL, new FactoryImpl("*").getLog("p.s/m")); - assertEquals(BOTH_FULL, new FactoryImpl("*{h;m}").getLog("p.s/m")); - assertEquals(HEADER_FULL, new FactoryImpl("*{h}").getLog("p.s/m")); - assertEquals(MSG_FULL, new FactoryImpl("*{m}").getLog("p.s/m")); - assertEquals(HEADER_256, new FactoryImpl("*{h:256}").getLog("p.s/m")); - assertEquals(MSG_256, new FactoryImpl("*{m:256}").getLog("p.s/m")); - assertEquals(BOTH_256, new FactoryImpl("*{h:256;m:256}").getLog("p.s/m")); - assertEquals( + assertSameLimits(BOTH_FULL, makeLog("*", "p.s/m")); + assertSameLimits(BOTH_FULL, makeLog("*{h;m}", "p.s/m")); + assertSameLimits(HEADER_FULL, makeLog("*{h}", "p.s/m")); + assertSameLimits(MSG_FULL, makeLog("*{m}", "p.s/m")); + assertSameLimits(HEADER_256, makeLog("*{h:256}", "p.s/m")); + assertSameLimits(MSG_256, makeLog("*{m:256}", "p.s/m")); + assertSameLimits(BOTH_256, makeLog("*{h:256;m:256}", "p.s/m")); + assertSameLimits( new Builder().header(Integer.MAX_VALUE).msg(256).build(), - new FactoryImpl("*{h;m:256}").getLog("p.s/m")); - assertEquals( + makeLog("*{h;m:256}", "p.s/m")); + assertSameLimits( new Builder().header(256).msg(Integer.MAX_VALUE).build(), - new FactoryImpl("*{h:256;m}").getLog("p.s/m")); + makeLog("*{h:256;m}", "p.s/m")); } @Test public void configBinLog_method() throws Exception { - assertEquals(BOTH_FULL, new FactoryImpl("p.s/m").getLog("p.s/m")); - assertEquals(BOTH_FULL, new FactoryImpl("p.s/m{h;m}").getLog("p.s/m")); - assertEquals(HEADER_FULL, new FactoryImpl("p.s/m{h}").getLog("p.s/m")); - assertEquals(MSG_FULL, new FactoryImpl("p.s/m{m}").getLog("p.s/m")); - assertEquals(HEADER_256, new FactoryImpl("p.s/m{h:256}").getLog("p.s/m")); - assertEquals(MSG_256, new FactoryImpl("p.s/m{m:256}").getLog("p.s/m")); - assertEquals(BOTH_256, new FactoryImpl("p.s/m{h:256;m:256}").getLog("p.s/m")); - assertEquals( + assertSameLimits(BOTH_FULL, makeLog("p.s/m", "p.s/m")); + assertSameLimits(BOTH_FULL, makeLog("p.s/m{h;m}", "p.s/m")); + assertSameLimits(HEADER_FULL, makeLog("p.s/m{h}", "p.s/m")); + assertSameLimits(MSG_FULL, makeLog("p.s/m{m}", "p.s/m")); + assertSameLimits(HEADER_256, makeLog("p.s/m{h:256}", "p.s/m")); + assertSameLimits(MSG_256, makeLog("p.s/m{m:256}", "p.s/m")); + assertSameLimits(BOTH_256, makeLog("p.s/m{h:256;m:256}", "p.s/m")); + assertSameLimits( new Builder().header(Integer.MAX_VALUE).msg(256).build(), - new FactoryImpl("p.s/m{h;m:256}").getLog("p.s/m")); - assertEquals( + makeLog("p.s/m{h;m:256}", "p.s/m")); + assertSameLimits( new Builder().header(256).msg(Integer.MAX_VALUE).build(), - new FactoryImpl("p.s/m{h:256;m}").getLog("p.s/m")); + makeLog("p.s/m{h:256;m}", "p.s/m")); } @Test public void configBinLog_method_absent() throws Exception { - assertNull(new FactoryImpl("p.s/m").getLog("p.s/absent")); + assertNull(makeLog("p.s/m", "p.s/absent")); } @Test public void configBinLog_service() throws Exception { - assertEquals(BOTH_FULL, new FactoryImpl("p.s/*").getLog("p.s/m")); - assertEquals(BOTH_FULL, new FactoryImpl("p.s/*{h;m}").getLog("p.s/m")); - assertEquals(HEADER_FULL, new FactoryImpl("p.s/*{h}").getLog("p.s/m")); - assertEquals(MSG_FULL, new FactoryImpl("p.s/*{m}").getLog("p.s/m")); - assertEquals(HEADER_256, new FactoryImpl("p.s/*{h:256}").getLog("p.s/m")); - assertEquals(MSG_256, new FactoryImpl("p.s/*{m:256}").getLog("p.s/m")); - assertEquals(BOTH_256, new FactoryImpl("p.s/*{h:256;m:256}").getLog("p.s/m")); - assertEquals( + assertSameLimits(BOTH_FULL, makeLog("p.s/*", "p.s/m")); + assertSameLimits(BOTH_FULL, makeLog("p.s/*{h;m}", "p.s/m")); + assertSameLimits(HEADER_FULL, makeLog("p.s/*{h}", "p.s/m")); + assertSameLimits(MSG_FULL, makeLog("p.s/*{m}", "p.s/m")); + assertSameLimits(HEADER_256, makeLog("p.s/*{h:256}", "p.s/m")); + assertSameLimits(MSG_256, makeLog("p.s/*{m:256}", "p.s/m")); + assertSameLimits(BOTH_256, makeLog("p.s/*{h:256;m:256}", "p.s/m")); + assertSameLimits( new Builder().header(Integer.MAX_VALUE).msg(256).build(), - new FactoryImpl("p.s/*{h;m:256}").getLog("p.s/m")); - assertEquals( + makeLog("p.s/*{h;m:256}", "p.s/m")); + assertSameLimits( new Builder().header(256).msg(Integer.MAX_VALUE).build(), - new FactoryImpl("p.s/*{h:256;m}").getLog("p.s/m")); + makeLog("p.s/*{h:256;m}", "p.s/m")); } @Test public void configBinLog_service_absent() throws Exception { - assertNull(new FactoryImpl("p.s/*").getLog("p.other/m")); + assertNull(makeLog("p.s/*", "p.other/m")); } @Test public void createLogFromOptionString() throws Exception { - assertEquals(BOTH_FULL, FactoryImpl.createBinaryLog(/*logConfig=*/ null)); - assertEquals(HEADER_FULL, FactoryImpl.createBinaryLog("{h}")); - assertEquals(MSG_FULL, FactoryImpl.createBinaryLog("{m}")); - assertEquals(HEADER_256, FactoryImpl.createBinaryLog("{h:256}")); - assertEquals(MSG_256, FactoryImpl.createBinaryLog("{m:256}")); - assertEquals(BOTH_256, FactoryImpl.createBinaryLog("{h:256;m:256}")); - assertEquals( + assertSameLimits(BOTH_FULL, makeLog(null)); + assertSameLimits(HEADER_FULL, makeLog("{h}")); + assertSameLimits(MSG_FULL, makeLog("{m}")); + assertSameLimits(HEADER_256, makeLog("{h:256}")); + assertSameLimits(MSG_256, makeLog("{m:256}")); + assertSameLimits(BOTH_256, makeLog("{h:256;m:256}")); + assertSameLimits( new Builder().header(Integer.MAX_VALUE).msg(256).build(), - FactoryImpl.createBinaryLog("{h;m:256}")); - assertEquals( + makeLog("{h;m:256}")); + assertSameLimits( new Builder().header(256).msg(Integer.MAX_VALUE).build(), - FactoryImpl.createBinaryLog("{h:256;m}")); + makeLog("{h:256;m}")); } @Test public void createLogFromOptionString_malformed() throws Exception { - assertNull(FactoryImpl.createBinaryLog("bad")); - assertNull(FactoryImpl.createBinaryLog("{bad}")); - assertNull(FactoryImpl.createBinaryLog("{x;y}")); - assertNull(FactoryImpl.createBinaryLog("{h:abc}")); - assertNull(FactoryImpl.createBinaryLog("{2}")); - assertNull(FactoryImpl.createBinaryLog("{2;2}")); + assertNull(makeLog("bad")); + assertNull(makeLog("{bad}")); + assertNull(makeLog("{x;y}")); + assertNull(makeLog("{h:abc}")); + assertNull(makeLog("{2}")); + assertNull(makeLog("{2;2}")); // The grammar specifies that if both h and m are present, h comes before m - assertNull(FactoryImpl.createBinaryLog("{m:123;h:123}")); + assertNull(makeLog("{m:123;h:123}")); // NumberFormatException - assertNull(FactoryImpl.createBinaryLog("{h:99999999999999}")); + assertNull(makeLog("{h:99999999999999}")); } @Test public void configBinLog_multiConfig_withGlobal() throws Exception { - FactoryImpl factory = new FactoryImpl( + String configStr = "*{h}," + "package.both256/*{h:256;m:256}," + "package.service1/both128{h:128;m:128}," - + "package.service2/method_messageOnly{m}"); - assertEquals(HEADER_FULL, factory.getLog("otherpackage.service/method")); + + "package.service2/method_messageOnly{m}"; + assertSameLimits(HEADER_FULL, makeLog(configStr, "otherpackage.service/method")); - assertEquals(BOTH_256, factory.getLog("package.both256/method1")); - assertEquals(BOTH_256, factory.getLog("package.both256/method2")); - assertEquals(BOTH_256, factory.getLog("package.both256/method3")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method1")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method2")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method3")); - assertEquals( - new Builder().header(128).msg(128).build(), factory.getLog("package.service1/both128")); + assertSameLimits( + new Builder().header(128).msg(128).build(), makeLog(configStr, "package.service1/both128")); // the global config is in effect - assertEquals(HEADER_FULL, factory.getLog("package.service1/absent")); + assertSameLimits(HEADER_FULL, makeLog(configStr, "package.service1/absent")); - assertEquals(MSG_FULL, factory.getLog("package.service2/method_messageOnly")); + assertSameLimits(MSG_FULL, makeLog(configStr, "package.service2/method_messageOnly")); // the global config is in effect - assertEquals(HEADER_FULL, factory.getLog("package.service2/absent")); + assertSameLimits(HEADER_FULL, makeLog(configStr, "package.service2/absent")); } @Test public void configBinLog_multiConfig_noGlobal() throws Exception { - FactoryImpl factory = new FactoryImpl( + String configStr = "package.both256/*{h:256;m:256}," + "package.service1/both128{h:128;m:128}," - + "package.service2/method_messageOnly{m}"); - assertNull(factory.getLog("otherpackage.service/method")); + + "package.service2/method_messageOnly{m}"; + assertNull(makeLog(configStr, "otherpackage.service/method")); - assertEquals(BOTH_256, factory.getLog("package.both256/method1")); - assertEquals(BOTH_256, factory.getLog("package.both256/method2")); - assertEquals(BOTH_256, factory.getLog("package.both256/method3")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method1")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method2")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method3")); - assertEquals( - new Builder().header(128).msg(128).build(), factory.getLog("package.service1/both128")); + assertSameLimits( + new Builder().header(128).msg(128).build(), makeLog(configStr, "package.service1/both128")); // no global config in effect - assertNull(factory.getLog("package.service1/absent")); + assertNull(makeLog(configStr, "package.service1/absent")); - assertEquals(MSG_FULL, factory.getLog("package.service2/method_messageOnly")); + assertSameLimits(MSG_FULL, makeLog(configStr, "package.service2/method_messageOnly")); // no global config in effect - assertNull(factory.getLog("package.service2/absent")); + assertNull(makeLog(configStr, "package.service2/absent")); } @Test public void configBinLog_ignoreDuplicates_global() throws Exception { - FactoryImpl factory = new FactoryImpl("*{h},p.s/m,*{h:256}"); + String configStr = "*{h},p.s/m,*{h:256}"; // The duplicate - assertEquals(HEADER_FULL, factory.getLog("p.other1/m")); - assertEquals(HEADER_FULL, factory.getLog("p.other2/m")); + assertSameLimits(HEADER_FULL, makeLog(configStr, "p.other1/m")); + assertSameLimits(HEADER_FULL, makeLog(configStr, "p.other2/m")); // Other - assertEquals(BOTH_FULL, factory.getLog("p.s/m")); + assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m")); } @Test public void configBinLog_ignoreDuplicates_service() throws Exception { - FactoryImpl factory = new FactoryImpl("p.s/*,*{h:256},p.s/*{h}"); + String configStr = "p.s/*,*{h:256},p.s/*{h}"; // The duplicate - assertEquals(BOTH_FULL, factory.getLog("p.s/m1")); - assertEquals(BOTH_FULL, factory.getLog("p.s/m2")); + assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m1")); + assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m2")); // Other - assertEquals(HEADER_256, factory.getLog("p.other1/m")); - assertEquals(HEADER_256, factory.getLog("p.other2/m")); + assertSameLimits(HEADER_256, makeLog(configStr, "p.other1/m")); + assertSameLimits(HEADER_256, makeLog(configStr, "p.other2/m")); } @Test public void configBinLog_ignoreDuplicates_method() throws Exception { - FactoryImpl factory = new FactoryImpl("p.s/m,*{h:256},p.s/m{h}"); + String configStr = "p.s/m,*{h:256},p.s/m{h}"; // The duplicate - assertEquals(BOTH_FULL, factory.getLog("p.s/m")); + assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m")); // Other - assertEquals(HEADER_256, factory.getLog("p.other1/m")); - assertEquals(HEADER_256, factory.getLog("p.other2/m")); + assertSameLimits(HEADER_256, makeLog(configStr, "p.other1/m")); + assertSameLimits(HEADER_256, makeLog(configStr, "p.other2/m")); } @Test @@ -419,38 +434,62 @@ public final class BinaryLogTest { @Test public void messageToProto() throws Exception { - byte[] bytes = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" - .getBytes(US_ASCII); + ByteBuffer bytes = ByteBuffer.wrap( + "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII)); + int remains = bytes.remaining(); Message message = BinaryLog.messageToProto(bytes, false, Integer.MAX_VALUE); assertEquals( Message .newBuilder() - .setData(ByteString.copyFrom(bytes)) + .setData(ByteString.copyFrom(bytes.duplicate())) + .setFlags(0) + .setLength(bytes.remaining()) + .build(), + message); + assertEquals(remains, bytes.remaining()); + } + + @Test + public void messageToProto_offsetByteBuffer() throws Exception { + String padding = "aaaaa"; + String body = "the actual message"; + String truncatedRemainder = "zzzz"; + ByteBuffer bytes = ByteBuffer.wrap( + (padding + body + truncatedRemainder).getBytes(US_ASCII)); + bytes.position(padding.length()); + int remains = bytes.remaining(); + Message message = BinaryLog.messageToProto(bytes, false, body.length()); + assertEquals( + Message + .newBuilder() + .setData(ByteString.copyFrom(body.getBytes(US_ASCII))) .setFlags(0) - .setLength(bytes.length) + .setLength(body.length() + truncatedRemainder.length()) .build(), message); + assertEquals(remains, bytes.remaining()); } @Test public void messageToProto_truncated() throws Exception { - byte[] bytes = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" - .getBytes(US_ASCII); + ByteBuffer bytes = ByteBuffer.wrap( + "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII)); assertEquals( Message .newBuilder() .setFlags(0) - .setLength(bytes.length) + .setLength(bytes.remaining()) .build(), BinaryLog.messageToProto(bytes, false, 0)); int limit = 10; + String truncatedMessage = "this is a "; assertEquals( Message .newBuilder() - .setData(ByteString.copyFrom(bytes, 0, limit)) + .setData(UnsafeByteOperations.unsafeWrap(truncatedMessage.getBytes(US_ASCII))) .setFlags(0) - .setLength(bytes.length) + .setLength(bytes.remaining()) .build(), BinaryLog.messageToProto(bytes, false, limit)); } @@ -461,6 +500,196 @@ public final class BinaryLogTest { assertEquals(1, BinaryLog.flagsForMessage(IS_COMPRESSED)); } + @Test + public void logSendInitialMetadata_server() throws Exception { + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + binaryLog.logSendInitialMetadata(metadata, IS_SERVER, CALL_ID, socketAddress); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setPeer(BinaryLog.socketToProto(socketAddress)) + .setMetadata(BinaryLog.metadataToProto(metadata, 10)) + .build()); + } + + @Test + public void logSendInitialMetadata_client() throws Exception { + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + binaryLog.logSendInitialMetadata(metadata, IS_CLIENT, CALL_ID, socketAddress); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setPeer(BinaryLog.socketToProto(socketAddress)) + .setMetadata(BinaryLog.metadataToProto(metadata, 10)) + .build()); + } + + @Test + public void logRecvInitialMetadata_server() throws Exception { + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + binaryLog.logRecvInitialMetadata(metadata, IS_SERVER, CALL_ID, socketAddress); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setPeer(BinaryLog.socketToProto(socketAddress)) + .setMetadata(BinaryLog.metadataToProto(metadata, 10)) + .build()); + } + + @Test + public void logRecvInitialMetadata_client() throws Exception { + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + binaryLog.logRecvInitialMetadata(metadata, IS_CLIENT, CALL_ID, socketAddress); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setPeer(BinaryLog.socketToProto(socketAddress)) + .setMetadata(BinaryLog.metadataToProto(metadata, 10)) + .build()); + } + + @Test + public void logTrailingMetadata_server() throws Exception { + binaryLog.logTrailingMetadata(metadata, IS_SERVER, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_TRAILING_METADATA) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setMetadata(BinaryLog.metadataToProto(metadata, 10)) + .build()); + } + + @Test + public void logTrailingMetadata_client() throws Exception { + binaryLog.logTrailingMetadata(metadata, IS_CLIENT, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_TRAILING_METADATA) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setMetadata(BinaryLog.metadataToProto(metadata, 10)) + .build()); + } + + @Test + public void logOutboundMessage_server() throws Exception { + binaryLog.logOutboundMessage(message, IS_COMPRESSED, IS_SERVER, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_MESSAGE) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) + .build()); + + binaryLog.logOutboundMessage(message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_MESSAGE) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setMessage(BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) + .build()); + verifyNoMoreInteractions(sink); + } + + @Test + public void logOutboundMessage_client() throws Exception { + binaryLog.logOutboundMessage(message, IS_COMPRESSED, IS_CLIENT, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_MESSAGE) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) + .build()); + + binaryLog.logOutboundMessage(message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_MESSAGE) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setMessage(BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) + .build()); + verifyNoMoreInteractions(sink); + } + + @Test + public void logInboundMessage_server() throws Exception { + binaryLog.logInboundMessage(message, IS_COMPRESSED, IS_SERVER, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_MESSAGE) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) + .build()); + + binaryLog.logInboundMessage(message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_MESSAGE) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setMessage(BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) + .build()); + verifyNoMoreInteractions(sink); + } + + @Test + public void logInboundMessage_client() throws Exception { + binaryLog.logInboundMessage(message, IS_COMPRESSED, IS_CLIENT, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_MESSAGE) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) + .build()); + + binaryLog.logInboundMessage(message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_MESSAGE) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinaryLog.callIdToProto(CALL_ID)) + .setMessage(BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) + .build()); + verifyNoMoreInteractions(sink); + } + /** A builder class to make unit test code more readable. */ private static final class Builder { int maxHeaderBytes = 0; @@ -477,7 +706,20 @@ public final class BinaryLogTest { } BinaryLog build() { - return new BinaryLog(maxHeaderBytes, maxMessageBytes); + return new BinaryLog(mock(BinaryLogSink.class), maxHeaderBytes, maxMessageBytes); } } + + private static void assertSameLimits(BinaryLog a, BinaryLog b) { + assertEquals(a.maxMessageBytes, b.maxMessageBytes); + assertEquals(a.maxHeaderBytes, b.maxHeaderBytes); + } + + private BinaryLog makeLog(String factoryConfigStr, String lookup) { + return new BinaryLog.FactoryImpl(sink, factoryConfigStr).getLog(lookup); + } + + private BinaryLog makeLog(String logConfigStr) { + return FactoryImpl.createBinaryLog(sink, logConfigStr); + } } -- cgit v1.2.3