aboutsummaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
authorzpencer <spencerfang@google.com>2018-02-16 13:47:31 -0800
committerGitHub <noreply@github.com>2018-02-16 13:47:31 -0800
commitbde2ba2444670764ec9c549f9adde220638c5ef4 (patch)
tree8801d4143627747dcf15152db6e791d6813213f6 /services
parent0c27e719ae427c78e9090afff9fa8ecad6f91b74 (diff)
downloadgrpc-grpc-java-bde2ba2444670764ec9c549f9adde220638c5ef4.tar.gz
services: introduce BinaryLogSinkProvider (#3917)
The `BinaryLog` will write `GrpcLogEntry`s to the sink, which is intended as a pluggable interface. It is the sink's responsibility to send the binlog protos to disk, to a remote logging service, etc.
Diffstat (limited to 'services')
-rw-r--r--services/src/main/java/io/grpc/services/BinaryLog.java134
-rw-r--r--services/src/main/java/io/grpc/services/BinaryLogSink.java41
-rw-r--r--services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java53
-rw-r--r--services/src/test/java/io/grpc/services/BinaryLogTest.java440
4 files changed, 552 insertions, 116 deletions
diff --git a/services/src/main/java/io/grpc/services/BinaryLog.java b/services/src/main/java/io/grpc/services/BinaryLog.java
index 522051bb0..6523ebf64 100644
--- a/services/src/main/java/io/grpc/services/BinaryLog.java
+++ b/services/src/main/java/io/grpc/services/BinaryLog.java
@@ -25,12 +25,15 @@ import com.google.protobuf.ByteString;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
+import io.grpc.binarylog.GrpcLogEntry;
+import io.grpc.binarylog.GrpcLogEntry.Type;
import io.grpc.binarylog.Message;
import io.grpc.binarylog.Metadata.Builder;
import io.grpc.binarylog.MetadataEntry;
import io.grpc.binarylog.Peer;
import io.grpc.binarylog.Peer.PeerType;
import io.grpc.binarylog.Uint128;
+import java.io.InputStream;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
@@ -46,25 +49,115 @@ import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
/**
* A binary log class that is configured for a specific {@link MethodDescriptor}.
*/
+// TODO(zpencer): this is really a per-method logger class, make the class name more clear.
+@ThreadSafe
final class BinaryLog {
private static final Logger logger = Logger.getLogger(BinaryLog.class.getName());
private static final int IP_PORT_BYTES = 2;
private static final int IP_PORT_UPPER_MASK = 0xff00;
private static final int IP_PORT_LOWER_MASK = 0xff;
- private final int maxHeaderBytes;
- private final int maxMessageBytes;
+ private final BinaryLogSink sink;
+ @VisibleForTesting
+ final int maxHeaderBytes;
+ @VisibleForTesting
+ final int maxMessageBytes;
@VisibleForTesting
- BinaryLog(int maxHeaderBytes, int maxMessageBytes) {
+ BinaryLog(BinaryLogSink sink, int maxHeaderBytes, int maxMessageBytes) {
+ this.sink = sink;
this.maxHeaderBytes = maxHeaderBytes;
this.maxMessageBytes = maxMessageBytes;
}
+ /**
+ * Logs the sending of initial metadata. This method logs the appropriate number of bytes
+ * as determined by the binary logging configuration.
+ */
+ public void logSendInitialMetadata(
+ Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket) {
+ GrpcLogEntry entry = GrpcLogEntry
+ .newBuilder()
+ .setType(Type.SEND_INITIAL_METADATA)
+ .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
+ .setCallId(callIdToProto(callId))
+ .setPeer(socketToProto(peerSocket))
+ .setMetadata(metadataToProto(metadata, maxHeaderBytes))
+ .build();
+ sink.write(entry);
+ }
+
+ /**
+ * Logs the receiving of initial metadata. This method logs the appropriate number of bytes
+ * as determined by the binary logging configuration.
+ */
+ public void logRecvInitialMetadata(
+ Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket) {
+ GrpcLogEntry entry = GrpcLogEntry
+ .newBuilder()
+ .setType(Type.RECV_INITIAL_METADATA)
+ .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
+ .setCallId(callIdToProto(callId))
+ .setPeer(socketToProto(peerSocket))
+ .setMetadata(metadataToProto(metadata, maxHeaderBytes))
+ .build();
+ sink.write(entry);
+ }
+
+ /**
+ * Logs the trailing metadata. This method logs the appropriate number of bytes
+ * as determined by the binary logging configuration.
+ */
+ public void logTrailingMetadata(Metadata metadata, boolean isServer, byte[] callId) {
+ GrpcLogEntry entry = GrpcLogEntry
+ .newBuilder()
+ .setType(isServer ? Type.SEND_TRAILING_METADATA : Type.RECV_TRAILING_METADATA)
+ .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
+ .setCallId(callIdToProto(callId))
+ .setMetadata(metadataToProto(metadata, maxHeaderBytes))
+ .build();
+ sink.write(entry);
+ }
+
+ /**
+ * Logs the outbound message. This method logs the appropriate number of bytes from
+ * {@code message}, and returns an {@link InputStream} that contains the original message.
+ * The number of bytes logged is determined by the binary logging configuration.
+ */
+ public void logOutboundMessage(
+ ByteBuffer message, boolean compressed, boolean isServer, byte[] callId) {
+ GrpcLogEntry entry = GrpcLogEntry
+ .newBuilder()
+ .setType(Type.SEND_MESSAGE)
+ .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
+ .setCallId(callIdToProto(callId))
+ .setMessage(messageToProto(message, compressed, maxMessageBytes))
+ .build();
+ sink.write(entry);
+ }
+
+ /**
+ * Logs the inbound message. This method logs the appropriate number of bytes from
+ * {@code message}, and returns an {@link InputStream} that contains the original message.
+ * The number of bytes logged is determined by the binary logging configuration.
+ */
+ public void logInboundMessage(
+ ByteBuffer message, boolean compressed, boolean isServer, byte[] callId) {
+ GrpcLogEntry entry = GrpcLogEntry
+ .newBuilder()
+ .setType(Type.RECV_MESSAGE)
+ .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
+ .setCallId(callIdToProto(callId))
+ .setMessage(messageToProto(message, compressed, maxMessageBytes))
+ .build();
+ sink.write(entry);
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof BinaryLog)) {
@@ -72,7 +165,8 @@ final class BinaryLog {
}
BinaryLog that = (BinaryLog) o;
return this.maxHeaderBytes == that.maxHeaderBytes
- && this.maxMessageBytes == that.maxMessageBytes;
+ && this.maxMessageBytes == that.maxMessageBytes
+ && this.sink.equals(that.sink);
}
@Override
@@ -84,7 +178,9 @@ final class BinaryLog {
public String toString() {
return getClass().getSimpleName() + '['
+ "maxHeaderBytes=" + maxHeaderBytes + ", "
- + "maxMessageBytes=" + maxMessageBytes + "]";
+ + "maxMessageBytes=" + maxMessageBytes
+ + "sink=" + sink
+ + "]";
}
private static final Factory DEFAULT_FACTORY;
@@ -94,8 +190,10 @@ final class BinaryLog {
Factory defaultFactory = NULL_FACTORY;
try {
String configStr = System.getenv("GRPC_BINARY_LOG_CONFIG");
- if (configStr != null && configStr.length() > 0) {
- defaultFactory = new FactoryImpl(configStr);
+ // TODO(zpencer): make BinaryLog.java implement isAvailable, and put this check there
+ BinaryLogSink sink = BinaryLogSinkProvider.provider();
+ if (sink != null && configStr != null && configStr.length() > 0) {
+ defaultFactory = new FactoryImpl(sink, configStr);
}
} catch (Throwable t) {
logger.log(Level.SEVERE, "Failed to initialize binary log. Disabling binary log.", t);
@@ -140,7 +238,7 @@ final class BinaryLog {
* Accepts a string in the format specified by the binary log spec.
*/
@VisibleForTesting
- FactoryImpl(String configurationString) {
+ FactoryImpl(BinaryLogSink sink, String configurationString) {
Preconditions.checkState(configurationString != null && configurationString.length() > 0);
BinaryLog globalLog = null;
Map<String, BinaryLog> perServiceLogs = new HashMap<String, BinaryLog>();
@@ -153,7 +251,7 @@ final class BinaryLog {
}
String methodOrSvc = configMatcher.group(1);
String binlogOptionStr = configMatcher.group(2);
- BinaryLog binLog = createBinaryLog(binlogOptionStr);
+ BinaryLog binLog = createBinaryLog(sink, binlogOptionStr);
if (binLog == null) {
continue;
}
@@ -215,9 +313,9 @@ final class BinaryLog {
*/
@VisibleForTesting
@Nullable
- static BinaryLog createBinaryLog(@Nullable String logConfig) {
+ static BinaryLog createBinaryLog(BinaryLogSink sink, @Nullable String logConfig) {
if (logConfig == null) {
- return new BinaryLog(Integer.MAX_VALUE, Integer.MAX_VALUE);
+ return new BinaryLog(sink, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
try {
Matcher headerMatcher;
@@ -244,7 +342,7 @@ final class BinaryLog {
logger.log(Level.SEVERE, "Illegal log config pattern: " + logConfig);
return null;
}
- return new BinaryLog(maxHeaderBytes, maxMsgBytes);
+ return new BinaryLog(sink, maxHeaderBytes, maxMsgBytes);
} catch (NumberFormatException e) {
logger.log(Level.SEVERE, "Illegal log config pattern: " + logConfig);
return null;
@@ -341,15 +439,17 @@ final class BinaryLog {
}
@VisibleForTesting
- static Message messageToProto(byte[] message, boolean compressed, int maxMessageBytes) {
- Preconditions.checkState(maxMessageBytes >= 0);
+ static Message messageToProto(ByteBuffer message, boolean compressed, int maxMessageBytes) {
+ int messageSize = message.remaining();
Message.Builder builder = Message
.newBuilder()
.setFlags(flagsForMessage(compressed))
- .setLength(message.length);
+ .setLength(messageSize);
if (maxMessageBytes > 0) {
- int limit = Math.min(maxMessageBytes, message.length);
- builder.setData(ByteString.copyFrom(message, 0, limit));
+ int desiredRemaining = Math.min(maxMessageBytes, messageSize);
+ ByteBuffer dup = message.duplicate();
+ dup.limit(dup.position() + desiredRemaining);
+ builder.setData(ByteString.copyFrom(dup));
}
return builder.build();
}
diff --git a/services/src/main/java/io/grpc/services/BinaryLogSink.java b/services/src/main/java/io/grpc/services/BinaryLogSink.java
new file mode 100644
index 000000000..5e60bedc4
--- /dev/null
+++ b/services/src/main/java/io/grpc/services/BinaryLogSink.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import com.google.protobuf.MessageLite;
+import java.io.Closeable;
+
+public abstract class BinaryLogSink implements Closeable {
+ /**
+ * Writes the {@code message} to the destination.
+ */
+ public abstract void write(MessageLite message);
+
+ /**
+ * Whether this provider is available for use, taking the current environment into consideration.
+ * If {@code false}, no other methods are safe to be called.
+ */
+ protected abstract boolean isAvailable();
+
+ /**
+ * A priority, from 0 to 10 that this provider should be used, taking the current environment into
+ * consideration. 5 should be considered the default, and then tweaked based on environment
+ * detection. A priority of 0 does not imply that the provider wouldn't work; just that it should
+ * be last in line.
+ */
+ protected abstract int priority();
+}
diff --git a/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java b/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java
new file mode 100644
index 000000000..13aab5133
--- /dev/null
+++ b/services/src/main/java/io/grpc/services/BinaryLogSinkProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2017, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import io.grpc.InternalServiceProviders;
+import java.util.Collections;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * Subclasses must be thread safe, and are responsible for writing the binary log message to
+ * the appropriate destination.
+ */
+@ThreadSafe
+final class BinaryLogSinkProvider {
+ private static final BinaryLogSink INSTANCE = InternalServiceProviders.load(
+ BinaryLogSink.class,
+ Collections.<Class<?>>emptyList(),
+ BinaryLogSinkProvider.class.getClassLoader(),
+ new InternalServiceProviders.PriorityAccessor<BinaryLogSink>() {
+ @Override
+ public boolean isAvailable(BinaryLogSink provider) {
+ return provider.isAvailable();
+ }
+
+ @Override
+ public int getPriority(BinaryLogSink provider) {
+ return provider.priority();
+ }
+ });
+
+ /**
+ * Returns the {@code BinaryLogSink} that should be used.
+ */
+ @Nullable
+ static BinaryLogSink provider() {
+ return INSTANCE;
+ }
+}
diff --git a/services/src/test/java/io/grpc/services/BinaryLogTest.java b/services/src/test/java/io/grpc/services/BinaryLogTest.java
index de95d1558..287a94f74 100644
--- a/services/src/test/java/io/grpc/services/BinaryLogTest.java
+++ b/services/src/test/java/io/grpc/services/BinaryLogTest.java
@@ -20,10 +20,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.primitives.Bytes;
import com.google.protobuf.ByteString;
+import com.google.protobuf.UnsafeByteOperations;
import io.grpc.Metadata;
+import io.grpc.binarylog.GrpcLogEntry;
import io.grpc.binarylog.Message;
import io.grpc.binarylog.MetadataEntry;
import io.grpc.binarylog.Peer;
@@ -64,7 +69,7 @@ public final class BinaryLogTest {
private static final Metadata.Key<String> KEY_C =
Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER);
private static final MetadataEntry ENTRY_A =
- MetadataEntry
+ MetadataEntry
.newBuilder()
.setKey(ByteString.copyFrom(KEY_A.name(), US_ASCII))
.setValue(ByteString.copyFrom(DATA_A.getBytes(US_ASCII)))
@@ -81,10 +86,20 @@ public final class BinaryLogTest {
.setKey(ByteString.copyFrom(KEY_C.name(), US_ASCII))
.setValue(ByteString.copyFrom(DATA_C.getBytes(US_ASCII)))
.build();
+ private static final boolean IS_SERVER = true;
+ private static final boolean IS_CLIENT = false;
private static final boolean IS_COMPRESSED = true;
private static final boolean IS_UNCOMPRESSED = false;
+ private static final byte[] CALL_ID = new byte[] {
+ 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18,
+ 0x19, 0x10, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f };
+ private static final int HEADER_LIMIT = 10;
+ private static final int MESSAGE_LIMIT = Integer.MAX_VALUE;
private final Metadata metadata = new Metadata();
+ private final BinaryLogSink sink = mock(BinaryLogSink.class);
+ private final BinaryLog binaryLog = new BinaryLog(sink, HEADER_LIMIT, MESSAGE_LIMIT);
+ private final ByteBuffer message = ByteBuffer.wrap(new byte[100]);
@Before
public void setUp() throws Exception {
@@ -95,169 +110,169 @@ public final class BinaryLogTest {
@Test
public void configBinLog_global() throws Exception {
- assertEquals(BOTH_FULL, new FactoryImpl("*").getLog("p.s/m"));
- assertEquals(BOTH_FULL, new FactoryImpl("*{h;m}").getLog("p.s/m"));
- assertEquals(HEADER_FULL, new FactoryImpl("*{h}").getLog("p.s/m"));
- assertEquals(MSG_FULL, new FactoryImpl("*{m}").getLog("p.s/m"));
- assertEquals(HEADER_256, new FactoryImpl("*{h:256}").getLog("p.s/m"));
- assertEquals(MSG_256, new FactoryImpl("*{m:256}").getLog("p.s/m"));
- assertEquals(BOTH_256, new FactoryImpl("*{h:256;m:256}").getLog("p.s/m"));
- assertEquals(
+ assertSameLimits(BOTH_FULL, makeLog("*", "p.s/m"));
+ assertSameLimits(BOTH_FULL, makeLog("*{h;m}", "p.s/m"));
+ assertSameLimits(HEADER_FULL, makeLog("*{h}", "p.s/m"));
+ assertSameLimits(MSG_FULL, makeLog("*{m}", "p.s/m"));
+ assertSameLimits(HEADER_256, makeLog("*{h:256}", "p.s/m"));
+ assertSameLimits(MSG_256, makeLog("*{m:256}", "p.s/m"));
+ assertSameLimits(BOTH_256, makeLog("*{h:256;m:256}", "p.s/m"));
+ assertSameLimits(
new Builder().header(Integer.MAX_VALUE).msg(256).build(),
- new FactoryImpl("*{h;m:256}").getLog("p.s/m"));
- assertEquals(
+ makeLog("*{h;m:256}", "p.s/m"));
+ assertSameLimits(
new Builder().header(256).msg(Integer.MAX_VALUE).build(),
- new FactoryImpl("*{h:256;m}").getLog("p.s/m"));
+ makeLog("*{h:256;m}", "p.s/m"));
}
@Test
public void configBinLog_method() throws Exception {
- assertEquals(BOTH_FULL, new FactoryImpl("p.s/m").getLog("p.s/m"));
- assertEquals(BOTH_FULL, new FactoryImpl("p.s/m{h;m}").getLog("p.s/m"));
- assertEquals(HEADER_FULL, new FactoryImpl("p.s/m{h}").getLog("p.s/m"));
- assertEquals(MSG_FULL, new FactoryImpl("p.s/m{m}").getLog("p.s/m"));
- assertEquals(HEADER_256, new FactoryImpl("p.s/m{h:256}").getLog("p.s/m"));
- assertEquals(MSG_256, new FactoryImpl("p.s/m{m:256}").getLog("p.s/m"));
- assertEquals(BOTH_256, new FactoryImpl("p.s/m{h:256;m:256}").getLog("p.s/m"));
- assertEquals(
+ assertSameLimits(BOTH_FULL, makeLog("p.s/m", "p.s/m"));
+ assertSameLimits(BOTH_FULL, makeLog("p.s/m{h;m}", "p.s/m"));
+ assertSameLimits(HEADER_FULL, makeLog("p.s/m{h}", "p.s/m"));
+ assertSameLimits(MSG_FULL, makeLog("p.s/m{m}", "p.s/m"));
+ assertSameLimits(HEADER_256, makeLog("p.s/m{h:256}", "p.s/m"));
+ assertSameLimits(MSG_256, makeLog("p.s/m{m:256}", "p.s/m"));
+ assertSameLimits(BOTH_256, makeLog("p.s/m{h:256;m:256}", "p.s/m"));
+ assertSameLimits(
new Builder().header(Integer.MAX_VALUE).msg(256).build(),
- new FactoryImpl("p.s/m{h;m:256}").getLog("p.s/m"));
- assertEquals(
+ makeLog("p.s/m{h;m:256}", "p.s/m"));
+ assertSameLimits(
new Builder().header(256).msg(Integer.MAX_VALUE).build(),
- new FactoryImpl("p.s/m{h:256;m}").getLog("p.s/m"));
+ makeLog("p.s/m{h:256;m}", "p.s/m"));
}
@Test
public void configBinLog_method_absent() throws Exception {
- assertNull(new FactoryImpl("p.s/m").getLog("p.s/absent"));
+ assertNull(makeLog("p.s/m", "p.s/absent"));
}
@Test
public void configBinLog_service() throws Exception {
- assertEquals(BOTH_FULL, new FactoryImpl("p.s/*").getLog("p.s/m"));
- assertEquals(BOTH_FULL, new FactoryImpl("p.s/*{h;m}").getLog("p.s/m"));
- assertEquals(HEADER_FULL, new FactoryImpl("p.s/*{h}").getLog("p.s/m"));
- assertEquals(MSG_FULL, new FactoryImpl("p.s/*{m}").getLog("p.s/m"));
- assertEquals(HEADER_256, new FactoryImpl("p.s/*{h:256}").getLog("p.s/m"));
- assertEquals(MSG_256, new FactoryImpl("p.s/*{m:256}").getLog("p.s/m"));
- assertEquals(BOTH_256, new FactoryImpl("p.s/*{h:256;m:256}").getLog("p.s/m"));
- assertEquals(
+ assertSameLimits(BOTH_FULL, makeLog("p.s/*", "p.s/m"));
+ assertSameLimits(BOTH_FULL, makeLog("p.s/*{h;m}", "p.s/m"));
+ assertSameLimits(HEADER_FULL, makeLog("p.s/*{h}", "p.s/m"));
+ assertSameLimits(MSG_FULL, makeLog("p.s/*{m}", "p.s/m"));
+ assertSameLimits(HEADER_256, makeLog("p.s/*{h:256}", "p.s/m"));
+ assertSameLimits(MSG_256, makeLog("p.s/*{m:256}", "p.s/m"));
+ assertSameLimits(BOTH_256, makeLog("p.s/*{h:256;m:256}", "p.s/m"));
+ assertSameLimits(
new Builder().header(Integer.MAX_VALUE).msg(256).build(),
- new FactoryImpl("p.s/*{h;m:256}").getLog("p.s/m"));
- assertEquals(
+ makeLog("p.s/*{h;m:256}", "p.s/m"));
+ assertSameLimits(
new Builder().header(256).msg(Integer.MAX_VALUE).build(),
- new FactoryImpl("p.s/*{h:256;m}").getLog("p.s/m"));
+ makeLog("p.s/*{h:256;m}", "p.s/m"));
}
@Test
public void configBinLog_service_absent() throws Exception {
- assertNull(new FactoryImpl("p.s/*").getLog("p.other/m"));
+ assertNull(makeLog("p.s/*", "p.other/m"));
}
@Test
public void createLogFromOptionString() throws Exception {
- assertEquals(BOTH_FULL, FactoryImpl.createBinaryLog(/*logConfig=*/ null));
- assertEquals(HEADER_FULL, FactoryImpl.createBinaryLog("{h}"));
- assertEquals(MSG_FULL, FactoryImpl.createBinaryLog("{m}"));
- assertEquals(HEADER_256, FactoryImpl.createBinaryLog("{h:256}"));
- assertEquals(MSG_256, FactoryImpl.createBinaryLog("{m:256}"));
- assertEquals(BOTH_256, FactoryImpl.createBinaryLog("{h:256;m:256}"));
- assertEquals(
+ assertSameLimits(BOTH_FULL, makeLog(null));
+ assertSameLimits(HEADER_FULL, makeLog("{h}"));
+ assertSameLimits(MSG_FULL, makeLog("{m}"));
+ assertSameLimits(HEADER_256, makeLog("{h:256}"));
+ assertSameLimits(MSG_256, makeLog("{m:256}"));
+ assertSameLimits(BOTH_256, makeLog("{h:256;m:256}"));
+ assertSameLimits(
new Builder().header(Integer.MAX_VALUE).msg(256).build(),
- FactoryImpl.createBinaryLog("{h;m:256}"));
- assertEquals(
+ makeLog("{h;m:256}"));
+ assertSameLimits(
new Builder().header(256).msg(Integer.MAX_VALUE).build(),
- FactoryImpl.createBinaryLog("{h:256;m}"));
+ makeLog("{h:256;m}"));
}
@Test
public void createLogFromOptionString_malformed() throws Exception {
- assertNull(FactoryImpl.createBinaryLog("bad"));
- assertNull(FactoryImpl.createBinaryLog("{bad}"));
- assertNull(FactoryImpl.createBinaryLog("{x;y}"));
- assertNull(FactoryImpl.createBinaryLog("{h:abc}"));
- assertNull(FactoryImpl.createBinaryLog("{2}"));
- assertNull(FactoryImpl.createBinaryLog("{2;2}"));
+ assertNull(makeLog("bad"));
+ assertNull(makeLog("{bad}"));
+ assertNull(makeLog("{x;y}"));
+ assertNull(makeLog("{h:abc}"));
+ assertNull(makeLog("{2}"));
+ assertNull(makeLog("{2;2}"));
// The grammar specifies that if both h and m are present, h comes before m
- assertNull(FactoryImpl.createBinaryLog("{m:123;h:123}"));
+ assertNull(makeLog("{m:123;h:123}"));
// NumberFormatException
- assertNull(FactoryImpl.createBinaryLog("{h:99999999999999}"));
+ assertNull(makeLog("{h:99999999999999}"));
}
@Test
public void configBinLog_multiConfig_withGlobal() throws Exception {
- FactoryImpl factory = new FactoryImpl(
+ String configStr =
"*{h},"
+ "package.both256/*{h:256;m:256},"
+ "package.service1/both128{h:128;m:128},"
- + "package.service2/method_messageOnly{m}");
- assertEquals(HEADER_FULL, factory.getLog("otherpackage.service/method"));
+ + "package.service2/method_messageOnly{m}";
+ assertSameLimits(HEADER_FULL, makeLog(configStr, "otherpackage.service/method"));
- assertEquals(BOTH_256, factory.getLog("package.both256/method1"));
- assertEquals(BOTH_256, factory.getLog("package.both256/method2"));
- assertEquals(BOTH_256, factory.getLog("package.both256/method3"));
+ assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method1"));
+ assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method2"));
+ assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method3"));
- assertEquals(
- new Builder().header(128).msg(128).build(), factory.getLog("package.service1/both128"));
+ assertSameLimits(
+ new Builder().header(128).msg(128).build(), makeLog(configStr, "package.service1/both128"));
// the global config is in effect
- assertEquals(HEADER_FULL, factory.getLog("package.service1/absent"));
+ assertSameLimits(HEADER_FULL, makeLog(configStr, "package.service1/absent"));
- assertEquals(MSG_FULL, factory.getLog("package.service2/method_messageOnly"));
+ assertSameLimits(MSG_FULL, makeLog(configStr, "package.service2/method_messageOnly"));
// the global config is in effect
- assertEquals(HEADER_FULL, factory.getLog("package.service2/absent"));
+ assertSameLimits(HEADER_FULL, makeLog(configStr, "package.service2/absent"));
}
@Test
public void configBinLog_multiConfig_noGlobal() throws Exception {
- FactoryImpl factory = new FactoryImpl(
+ String configStr =
"package.both256/*{h:256;m:256},"
+ "package.service1/both128{h:128;m:128},"
- + "package.service2/method_messageOnly{m}");
- assertNull(factory.getLog("otherpackage.service/method"));
+ + "package.service2/method_messageOnly{m}";
+ assertNull(makeLog(configStr, "otherpackage.service/method"));
- assertEquals(BOTH_256, factory.getLog("package.both256/method1"));
- assertEquals(BOTH_256, factory.getLog("package.both256/method2"));
- assertEquals(BOTH_256, factory.getLog("package.both256/method3"));
+ assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method1"));
+ assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method2"));
+ assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method3"));
- assertEquals(
- new Builder().header(128).msg(128).build(), factory.getLog("package.service1/both128"));
+ assertSameLimits(
+ new Builder().header(128).msg(128).build(), makeLog(configStr, "package.service1/both128"));
// no global config in effect
- assertNull(factory.getLog("package.service1/absent"));
+ assertNull(makeLog(configStr, "package.service1/absent"));
- assertEquals(MSG_FULL, factory.getLog("package.service2/method_messageOnly"));
+ assertSameLimits(MSG_FULL, makeLog(configStr, "package.service2/method_messageOnly"));
// no global config in effect
- assertNull(factory.getLog("package.service2/absent"));
+ assertNull(makeLog(configStr, "package.service2/absent"));
}
@Test
public void configBinLog_ignoreDuplicates_global() throws Exception {
- FactoryImpl factory = new FactoryImpl("*{h},p.s/m,*{h:256}");
+ String configStr = "*{h},p.s/m,*{h:256}";
// The duplicate
- assertEquals(HEADER_FULL, factory.getLog("p.other1/m"));
- assertEquals(HEADER_FULL, factory.getLog("p.other2/m"));
+ assertSameLimits(HEADER_FULL, makeLog(configStr, "p.other1/m"));
+ assertSameLimits(HEADER_FULL, makeLog(configStr, "p.other2/m"));
// Other
- assertEquals(BOTH_FULL, factory.getLog("p.s/m"));
+ assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m"));
}
@Test
public void configBinLog_ignoreDuplicates_service() throws Exception {
- FactoryImpl factory = new FactoryImpl("p.s/*,*{h:256},p.s/*{h}");
+ String configStr = "p.s/*,*{h:256},p.s/*{h}";
// The duplicate
- assertEquals(BOTH_FULL, factory.getLog("p.s/m1"));
- assertEquals(BOTH_FULL, factory.getLog("p.s/m2"));
+ assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m1"));
+ assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m2"));
// Other
- assertEquals(HEADER_256, factory.getLog("p.other1/m"));
- assertEquals(HEADER_256, factory.getLog("p.other2/m"));
+ assertSameLimits(HEADER_256, makeLog(configStr, "p.other1/m"));
+ assertSameLimits(HEADER_256, makeLog(configStr, "p.other2/m"));
}
@Test
public void configBinLog_ignoreDuplicates_method() throws Exception {
- FactoryImpl factory = new FactoryImpl("p.s/m,*{h:256},p.s/m{h}");
+ String configStr = "p.s/m,*{h:256},p.s/m{h}";
// The duplicate
- assertEquals(BOTH_FULL, factory.getLog("p.s/m"));
+ assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m"));
// Other
- assertEquals(HEADER_256, factory.getLog("p.other1/m"));
- assertEquals(HEADER_256, factory.getLog("p.other2/m"));
+ assertSameLimits(HEADER_256, makeLog(configStr, "p.other1/m"));
+ assertSameLimits(HEADER_256, makeLog(configStr, "p.other2/m"));
}
@Test
@@ -419,38 +434,62 @@ public final class BinaryLogTest {
@Test
public void messageToProto() throws Exception {
- byte[] bytes = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
- .getBytes(US_ASCII);
+ ByteBuffer bytes = ByteBuffer.wrap(
+ "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII));
+ int remains = bytes.remaining();
Message message = BinaryLog.messageToProto(bytes, false, Integer.MAX_VALUE);
assertEquals(
Message
.newBuilder()
- .setData(ByteString.copyFrom(bytes))
+ .setData(ByteString.copyFrom(bytes.duplicate()))
+ .setFlags(0)
+ .setLength(bytes.remaining())
+ .build(),
+ message);
+ assertEquals(remains, bytes.remaining());
+ }
+
+ @Test
+ public void messageToProto_offsetByteBuffer() throws Exception {
+ String padding = "aaaaa";
+ String body = "the actual message";
+ String truncatedRemainder = "zzzz";
+ ByteBuffer bytes = ByteBuffer.wrap(
+ (padding + body + truncatedRemainder).getBytes(US_ASCII));
+ bytes.position(padding.length());
+ int remains = bytes.remaining();
+ Message message = BinaryLog.messageToProto(bytes, false, body.length());
+ assertEquals(
+ Message
+ .newBuilder()
+ .setData(ByteString.copyFrom(body.getBytes(US_ASCII)))
.setFlags(0)
- .setLength(bytes.length)
+ .setLength(body.length() + truncatedRemainder.length())
.build(),
message);
+ assertEquals(remains, bytes.remaining());
}
@Test
public void messageToProto_truncated() throws Exception {
- byte[] bytes = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
- .getBytes(US_ASCII);
+ ByteBuffer bytes = ByteBuffer.wrap(
+ "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII));
assertEquals(
Message
.newBuilder()
.setFlags(0)
- .setLength(bytes.length)
+ .setLength(bytes.remaining())
.build(),
BinaryLog.messageToProto(bytes, false, 0));
int limit = 10;
+ String truncatedMessage = "this is a ";
assertEquals(
Message
.newBuilder()
- .setData(ByteString.copyFrom(bytes, 0, limit))
+ .setData(UnsafeByteOperations.unsafeWrap(truncatedMessage.getBytes(US_ASCII)))
.setFlags(0)
- .setLength(bytes.length)
+ .setLength(bytes.remaining())
.build(),
BinaryLog.messageToProto(bytes, false, limit));
}
@@ -461,6 +500,196 @@ public final class BinaryLogTest {
assertEquals(1, BinaryLog.flagsForMessage(IS_COMPRESSED));
}
+ @Test
+ public void logSendInitialMetadata_server() throws Exception {
+ InetAddress address = InetAddress.getByName("127.0.0.1");
+ int port = 12345;
+ InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+ binaryLog.logSendInitialMetadata(metadata, IS_SERVER, CALL_ID, socketAddress);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA)
+ .setLogger(GrpcLogEntry.Logger.SERVER)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setPeer(BinaryLog.socketToProto(socketAddress))
+ .setMetadata(BinaryLog.metadataToProto(metadata, 10))
+ .build());
+ }
+
+ @Test
+ public void logSendInitialMetadata_client() throws Exception {
+ InetAddress address = InetAddress.getByName("127.0.0.1");
+ int port = 12345;
+ InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+ binaryLog.logSendInitialMetadata(metadata, IS_CLIENT, CALL_ID, socketAddress);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA)
+ .setLogger(GrpcLogEntry.Logger.CLIENT)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setPeer(BinaryLog.socketToProto(socketAddress))
+ .setMetadata(BinaryLog.metadataToProto(metadata, 10))
+ .build());
+ }
+
+ @Test
+ public void logRecvInitialMetadata_server() throws Exception {
+ InetAddress address = InetAddress.getByName("127.0.0.1");
+ int port = 12345;
+ InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+ binaryLog.logRecvInitialMetadata(metadata, IS_SERVER, CALL_ID, socketAddress);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA)
+ .setLogger(GrpcLogEntry.Logger.SERVER)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setPeer(BinaryLog.socketToProto(socketAddress))
+ .setMetadata(BinaryLog.metadataToProto(metadata, 10))
+ .build());
+ }
+
+ @Test
+ public void logRecvInitialMetadata_client() throws Exception {
+ InetAddress address = InetAddress.getByName("127.0.0.1");
+ int port = 12345;
+ InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+ binaryLog.logRecvInitialMetadata(metadata, IS_CLIENT, CALL_ID, socketAddress);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA)
+ .setLogger(GrpcLogEntry.Logger.CLIENT)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setPeer(BinaryLog.socketToProto(socketAddress))
+ .setMetadata(BinaryLog.metadataToProto(metadata, 10))
+ .build());
+ }
+
+ @Test
+ public void logTrailingMetadata_server() throws Exception {
+ binaryLog.logTrailingMetadata(metadata, IS_SERVER, CALL_ID);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.SEND_TRAILING_METADATA)
+ .setLogger(GrpcLogEntry.Logger.SERVER)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setMetadata(BinaryLog.metadataToProto(metadata, 10))
+ .build());
+ }
+
+ @Test
+ public void logTrailingMetadata_client() throws Exception {
+ binaryLog.logTrailingMetadata(metadata, IS_CLIENT, CALL_ID);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.RECV_TRAILING_METADATA)
+ .setLogger(GrpcLogEntry.Logger.CLIENT)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setMetadata(BinaryLog.metadataToProto(metadata, 10))
+ .build());
+ }
+
+ @Test
+ public void logOutboundMessage_server() throws Exception {
+ binaryLog.logOutboundMessage(message, IS_COMPRESSED, IS_SERVER, CALL_ID);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.SEND_MESSAGE)
+ .setLogger(GrpcLogEntry.Logger.SERVER)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT))
+ .build());
+
+ binaryLog.logOutboundMessage(message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.SEND_MESSAGE)
+ .setLogger(GrpcLogEntry.Logger.SERVER)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setMessage(BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT))
+ .build());
+ verifyNoMoreInteractions(sink);
+ }
+
+ @Test
+ public void logOutboundMessage_client() throws Exception {
+ binaryLog.logOutboundMessage(message, IS_COMPRESSED, IS_CLIENT, CALL_ID);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.SEND_MESSAGE)
+ .setLogger(GrpcLogEntry.Logger.CLIENT)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT))
+ .build());
+
+ binaryLog.logOutboundMessage(message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.SEND_MESSAGE)
+ .setLogger(GrpcLogEntry.Logger.CLIENT)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setMessage(BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT))
+ .build());
+ verifyNoMoreInteractions(sink);
+ }
+
+ @Test
+ public void logInboundMessage_server() throws Exception {
+ binaryLog.logInboundMessage(message, IS_COMPRESSED, IS_SERVER, CALL_ID);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.RECV_MESSAGE)
+ .setLogger(GrpcLogEntry.Logger.SERVER)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT))
+ .build());
+
+ binaryLog.logInboundMessage(message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.RECV_MESSAGE)
+ .setLogger(GrpcLogEntry.Logger.SERVER)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setMessage(BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT))
+ .build());
+ verifyNoMoreInteractions(sink);
+ }
+
+ @Test
+ public void logInboundMessage_client() throws Exception {
+ binaryLog.logInboundMessage(message, IS_COMPRESSED, IS_CLIENT, CALL_ID);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.RECV_MESSAGE)
+ .setLogger(GrpcLogEntry.Logger.CLIENT)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT))
+ .build());
+
+ binaryLog.logInboundMessage(message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID);
+ verify(sink).write(
+ GrpcLogEntry
+ .newBuilder()
+ .setType(GrpcLogEntry.Type.RECV_MESSAGE)
+ .setLogger(GrpcLogEntry.Logger.CLIENT)
+ .setCallId(BinaryLog.callIdToProto(CALL_ID))
+ .setMessage(BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT))
+ .build());
+ verifyNoMoreInteractions(sink);
+ }
+
/** A builder class to make unit test code more readable. */
private static final class Builder {
int maxHeaderBytes = 0;
@@ -477,7 +706,20 @@ public final class BinaryLogTest {
}
BinaryLog build() {
- return new BinaryLog(maxHeaderBytes, maxMessageBytes);
+ return new BinaryLog(mock(BinaryLogSink.class), maxHeaderBytes, maxMessageBytes);
}
}
+
+ private static void assertSameLimits(BinaryLog a, BinaryLog b) {
+ assertEquals(a.maxMessageBytes, b.maxMessageBytes);
+ assertEquals(a.maxHeaderBytes, b.maxHeaderBytes);
+ }
+
+ private BinaryLog makeLog(String factoryConfigStr, String lookup) {
+ return new BinaryLog.FactoryImpl(sink, factoryConfigStr).getLog(lookup);
+ }
+
+ private BinaryLog makeLog(String logConfigStr) {
+ return FactoryImpl.createBinaryLog(sink, logConfigStr);
+ }
}