From 722acb151a9ecb2eb2b65cb079165d4ef8d69b1e Mon Sep 17 00:00:00 2001 From: zpencer Date: Wed, 2 May 2018 18:11:21 -0700 Subject: services: BinaryLog is an overused term, rename to BinlogHelper (#4429) This class really is an internal helper class for the implementation. --- .../src/main/java/io/grpc/services/BinaryLog.java | 607 ------------- .../io/grpc/services/BinaryLogProviderImpl.java | 6 +- .../main/java/io/grpc/services/BinlogHelper.java | 607 +++++++++++++ .../test/java/io/grpc/services/BinaryLogTest.java | 974 --------------------- .../java/io/grpc/services/BinlogHelperTest.java | 974 +++++++++++++++++++++ 5 files changed, 1584 insertions(+), 1584 deletions(-) delete mode 100644 services/src/main/java/io/grpc/services/BinaryLog.java create mode 100644 services/src/main/java/io/grpc/services/BinlogHelper.java delete mode 100644 services/src/test/java/io/grpc/services/BinaryLogTest.java create mode 100644 services/src/test/java/io/grpc/services/BinlogHelperTest.java (limited to 'services') diff --git a/services/src/main/java/io/grpc/services/BinaryLog.java b/services/src/main/java/io/grpc/services/BinaryLog.java deleted file mode 100644 index 421a5f1e1..000000000 --- a/services/src/main/java/io/grpc/services/BinaryLog.java +++ /dev/null @@ -1,607 +0,0 @@ -/* - * Copyright 2017, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.services; - -import static io.grpc.internal.BinaryLogProvider.BYTEARRAY_MARSHALLER; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.primitives.Bytes; -import com.google.protobuf.ByteString; -import io.grpc.Attributes; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; -import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; -import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; -import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; -import io.grpc.Grpc; -import io.grpc.InternalMetadata; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.MethodDescriptor.Marshaller; -import io.grpc.ServerCall; -import io.grpc.ServerCall.Listener; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.Status; -import io.grpc.binarylog.GrpcLogEntry; -import io.grpc.binarylog.GrpcLogEntry.Type; -import io.grpc.binarylog.Message; -import io.grpc.binarylog.Metadata.Builder; -import io.grpc.binarylog.MetadataEntry; -import io.grpc.binarylog.Peer; -import io.grpc.binarylog.Peer.PeerType; -import io.grpc.binarylog.Uint128; -import io.grpc.internal.BinaryLogProvider.CallId; -import java.net.Inet4Address; -import java.net.Inet6Address; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.charset.Charset; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.annotation.Nullable; -import javax.annotation.concurrent.ThreadSafe; - -/** - * A binary log class that is configured for a specific {@link MethodDescriptor}. - */ -@ThreadSafe -final class BinaryLog { - private static final Logger logger = Logger.getLogger(BinaryLog.class.getName()); - private static final int IP_PORT_BYTES = 2; - private static final int IP_PORT_UPPER_MASK = 0xff00; - private static final int IP_PORT_LOWER_MASK = 0xff; - private static final boolean SERVER = true; - private static final boolean CLIENT = false; - - @VisibleForTesting - static final CallId emptyCallId = new CallId(0, 0); - @VisibleForTesting - static final SocketAddress DUMMY_SOCKET = new DummySocketAddress(); - @VisibleForTesting - static final boolean DUMMY_IS_COMPRESSED = false; - - @VisibleForTesting - final SinkWriter writer; - - @VisibleForTesting - BinaryLog(SinkWriter writer) { - this.writer = writer; - } - - // TODO(zpencer): move proto related static helpers into this class - static final class SinkWriterImpl extends SinkWriter { - private final BinaryLogSink sink; - private final int maxHeaderBytes; - private final int maxMessageBytes; - - SinkWriterImpl(BinaryLogSink sink, int maxHeaderBytes, int maxMessageBytes) { - this.sink = sink; - this.maxHeaderBytes = maxHeaderBytes; - this.maxMessageBytes = maxMessageBytes; - } - - @Override - void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId) { - GrpcLogEntry entry = GrpcLogEntry - .newBuilder() - .setType(Type.SEND_INITIAL_METADATA) - .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) - .setCallId(callIdToProto(callId)) - .setMetadata(metadataToProto(metadata, maxHeaderBytes)) - .build(); - sink.write(entry); - } - - @Override - void logRecvInitialMetadata( - Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) { - GrpcLogEntry entry = GrpcLogEntry - .newBuilder() - .setType(Type.RECV_INITIAL_METADATA) - .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) - .setCallId(callIdToProto(callId)) - .setPeer(socketToProto(peerSocket)) - .setMetadata(metadataToProto(metadata, maxHeaderBytes)) - .build(); - sink.write(entry); - } - - @Override - void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId) { - GrpcLogEntry entry = GrpcLogEntry - .newBuilder() - .setType(isServer ? Type.SEND_TRAILING_METADATA : Type.RECV_TRAILING_METADATA) - .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) - .setCallId(callIdToProto(callId)) - .setMetadata(metadataToProto(metadata, maxHeaderBytes)) - .build(); - sink.write(entry); - } - - @Override - void logOutboundMessage( - Marshaller marshaller, - T message, - boolean compressed, - boolean isServer, - CallId callId) { - if (marshaller != BYTEARRAY_MARSHALLER) { - throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); - } - byte[] bytes = (byte[]) message; - GrpcLogEntry entry = GrpcLogEntry - .newBuilder() - .setType(Type.SEND_MESSAGE) - .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) - .setCallId(callIdToProto(callId)) - .setMessage(messageToProto(bytes, compressed, maxMessageBytes)) - .build(); - sink.write(entry); - } - - @Override - void logInboundMessage( - Marshaller marshaller, - T message, - boolean compressed, - boolean isServer, - CallId callId) { - if (marshaller != BYTEARRAY_MARSHALLER) { - throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); - } - byte[] bytes = (byte[]) message; - GrpcLogEntry entry = GrpcLogEntry - .newBuilder() - .setType(Type.RECV_MESSAGE) - .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) - .setCallId(callIdToProto(callId)) - .setMessage(messageToProto(bytes, compressed, maxMessageBytes)) - .build(); - sink.write(entry); - } - - @Override - int getMaxHeaderBytes() { - return maxHeaderBytes; - } - - @Override - int getMaxMessageBytes() { - return maxMessageBytes; - } - } - - abstract static class SinkWriter { - /** - * Logs the sending of initial metadata. This method logs the appropriate number of bytes - * as determined by the binary logging configuration. - */ - abstract void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId); - - /** - * Logs the receiving of initial metadata. This method logs the appropriate number of bytes - * as determined by the binary logging configuration. - */ - abstract void logRecvInitialMetadata( - Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket); - - /** - * Logs the trailing metadata. This method logs the appropriate number of bytes - * as determined by the binary logging configuration. - */ - abstract void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId); - - /** - * Logs the outbound message. This method logs the appropriate number of bytes from - * {@code message}, and returns a duplicate of the message. - * The number of bytes logged is determined by the binary logging configuration. - * This method takes ownership of {@code message}. - */ - abstract void logOutboundMessage( - Marshaller marshaller, T message, boolean compressed, boolean isServer, CallId callId); - - /** - * Logs the inbound message. This method logs the appropriate number of bytes from - * {@code message}, and returns a duplicate of the message. - * The number of bytes logged is determined by the binary logging configuration. - * This method takes ownership of {@code message}. - */ - abstract void logInboundMessage( - Marshaller marshaller, T message, boolean compressed, boolean isServer, CallId callId); - - /** - * Returns the number bytes of the header this writer will log, according to configuration. - */ - abstract int getMaxHeaderBytes(); - - /** - * Returns the number bytes of the message this writer will log, according to configuration. - */ - abstract int getMaxMessageBytes(); - } - - static SocketAddress getPeerSocket(Attributes streamAttributes) { - SocketAddress peer = streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); - if (peer == null) { - return DUMMY_SOCKET; - } - return peer; - } - - public ClientInterceptor getClientInterceptor(final CallId callId) { - return new ClientInterceptor() { - @Override - public ClientCall interceptCall( - final MethodDescriptor method, CallOptions callOptions, Channel next) { - return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { - @Override - public void start(Listener responseListener, Metadata headers) { - writer.logSendInitialMetadata(headers, CLIENT, callId); - ClientCall.Listener wListener = - new SimpleForwardingClientCallListener(responseListener) { - @Override - public void onMessage(RespT message) { - writer.logInboundMessage( - method.getResponseMarshaller(), - message, - DUMMY_IS_COMPRESSED, - CLIENT, - callId); - super.onMessage(message); - } - - @Override - public void onHeaders(Metadata headers) { - SocketAddress peer = getPeerSocket(getAttributes()); - writer.logRecvInitialMetadata(headers, CLIENT, callId, peer); - super.onHeaders(headers); - } - - @Override - public void onClose(Status status, Metadata trailers) { - writer.logTrailingMetadata(trailers, CLIENT, callId); - super.onClose(status, trailers); - } - }; - super.start(wListener, headers); - } - - @Override - public void sendMessage(ReqT message) { - writer.logOutboundMessage( - method.getRequestMarshaller(), - message, - DUMMY_IS_COMPRESSED, - CLIENT, - callId); - super.sendMessage(message); - } - }; - } - }; - } - - public ServerInterceptor getServerInterceptor(final CallId callId) { - return new ServerInterceptor() { - @Override - public Listener interceptCall( - final ServerCall call, - Metadata headers, - ServerCallHandler next) { - SocketAddress peer = getPeerSocket(call.getAttributes()); - writer.logRecvInitialMetadata(headers, SERVER, callId, peer); - ServerCall wCall = new SimpleForwardingServerCall(call) { - @Override - public void sendMessage(RespT message) { - writer.logOutboundMessage( - call.getMethodDescriptor().getResponseMarshaller(), - message, - DUMMY_IS_COMPRESSED, - SERVER, - callId); - super.sendMessage(message); - } - - @Override - public void sendHeaders(Metadata headers) { - writer.logSendInitialMetadata(headers, SERVER, callId); - super.sendHeaders(headers); - } - - @Override - public void close(Status status, Metadata trailers) { - writer.logTrailingMetadata(trailers, SERVER, callId); - super.close(status, trailers); - } - }; - - return new SimpleForwardingServerCallListener(next.startCall(wCall, headers)) { - @Override - public void onMessage(ReqT message) { - writer.logInboundMessage( - call.getMethodDescriptor().getRequestMarshaller(), - message, - DUMMY_IS_COMPRESSED, - SERVER, - callId); - super.onMessage(message); - } - }; - } - }; - } - - interface Factory { - @Nullable - BinaryLog getLog(String fullMethodName); - } - - static final class FactoryImpl implements Factory { - // '*' for global, 'service/*' for service glob, or 'service/method' for fully qualified. - private static final Pattern logPatternRe = Pattern.compile("[^{]+"); - // A curly brace wrapped expression. Will be further matched with the more specified REs below. - private static final Pattern logOptionsRe = Pattern.compile("\\{[^}]+}"); - private static final Pattern configRe = Pattern.compile( - String.format("^(%s)(%s)?$", logPatternRe.pattern(), logOptionsRe.pattern())); - // Regexes to extract per-binlog options - // The form: {m:256} - private static final Pattern msgRe = Pattern.compile("\\{m(?::(\\d+))?}"); - // The form: {h:256} - private static final Pattern headerRe = Pattern.compile("\\{h(?::(\\d+))?}"); - // The form: {h:256,m:256} - private static final Pattern bothRe = Pattern.compile("\\{h(?::(\\d+))?;m(?::(\\d+))?}"); - - private final BinaryLog globalLog; - private final Map perServiceLogs; - private final Map perMethodLogs; - - /** - * Accepts a string in the format specified by the binary log spec. - */ - @VisibleForTesting - FactoryImpl(BinaryLogSink sink, String configurationString) { - Preconditions.checkNotNull(sink); - Preconditions.checkState(configurationString != null && configurationString.length() > 0); - BinaryLog globalLog = null; - Map perServiceLogs = new HashMap(); - Map perMethodLogs = new HashMap(); - - for (String configuration : Splitter.on(',').split(configurationString)) { - Matcher configMatcher = configRe.matcher(configuration); - if (!configMatcher.matches()) { - throw new IllegalArgumentException("Bad input: " + configuration); - } - String methodOrSvc = configMatcher.group(1); - String binlogOptionStr = configMatcher.group(2); - BinaryLog binLog = createBinaryLog(sink, binlogOptionStr); - if (binLog == null) { - continue; - } - if (methodOrSvc.equals("*")) { - if (globalLog != null) { - logger.log(Level.SEVERE, "Ignoring duplicate entry: " + configuration); - continue; - } - globalLog = binLog; - logger.info("Global binlog: " + globalLog); - } else if (isServiceGlob(methodOrSvc)) { - String service = MethodDescriptor.extractFullServiceName(methodOrSvc); - if (perServiceLogs.containsKey(service)) { - logger.log(Level.SEVERE, "Ignoring duplicate entry: " + configuration); - continue; - } - perServiceLogs.put(service, binLog); - logger.info(String.format("Service binlog: service=%s log=%s", service, binLog)); - } else { - // assume fully qualified method name - if (perMethodLogs.containsKey(methodOrSvc)) { - logger.log(Level.SEVERE, "Ignoring duplicate entry: " + configuration); - continue; - } - perMethodLogs.put(methodOrSvc, binLog); - logger.info(String.format("Method binlog: method=%s log=%s", methodOrSvc, binLog)); - } - } - this.globalLog = globalLog; - this.perServiceLogs = Collections.unmodifiableMap(perServiceLogs); - this.perMethodLogs = Collections.unmodifiableMap(perMethodLogs); - } - - /** - * Accepts a full method name and returns the log that should be used. - */ - @Override - public BinaryLog getLog(String fullMethodName) { - BinaryLog methodLog = perMethodLogs.get(fullMethodName); - if (methodLog != null) { - return methodLog; - } - BinaryLog serviceLog = perServiceLogs.get( - MethodDescriptor.extractFullServiceName(fullMethodName)); - if (serviceLog != null) { - return serviceLog; - } - return globalLog; - } - - /** - * Returns a binlog with the correct header and message limits or {@code null} if the input - * is malformed. The input should be a string that is in one of these forms: - * - *

{@code {h(:\d+)?}, {m(:\d+)?}, {h(:\d+)?,m(:\d+)?}} - * - *

If the {@code logConfig} is null, the returned binlog will have a limit of - * Integer.MAX_VALUE. - */ - @VisibleForTesting - @Nullable - static BinaryLog createBinaryLog(BinaryLogSink sink, @Nullable String logConfig) { - if (logConfig == null) { - return new BinaryLog( - new SinkWriterImpl(sink, Integer.MAX_VALUE, Integer.MAX_VALUE)); - } - try { - Matcher headerMatcher; - Matcher msgMatcher; - Matcher bothMatcher; - final int maxHeaderBytes; - final int maxMsgBytes; - if ((headerMatcher = headerRe.matcher(logConfig)).matches()) { - String maxHeaderStr = headerMatcher.group(1); - maxHeaderBytes = - maxHeaderStr != null ? Integer.parseInt(maxHeaderStr) : Integer.MAX_VALUE; - maxMsgBytes = 0; - } else if ((msgMatcher = msgRe.matcher(logConfig)).matches()) { - maxHeaderBytes = 0; - String maxMsgStr = msgMatcher.group(1); - maxMsgBytes = maxMsgStr != null ? Integer.parseInt(maxMsgStr) : Integer.MAX_VALUE; - } else if ((bothMatcher = bothRe.matcher(logConfig)).matches()) { - String maxHeaderStr = bothMatcher.group(1); - String maxMsgStr = bothMatcher.group(2); - maxHeaderBytes = - maxHeaderStr != null ? Integer.parseInt(maxHeaderStr) : Integer.MAX_VALUE; - maxMsgBytes = maxMsgStr != null ? Integer.parseInt(maxMsgStr) : Integer.MAX_VALUE; - } else { - logger.log(Level.SEVERE, "Illegal log config pattern: " + logConfig); - return null; - } - return new BinaryLog(new SinkWriterImpl(sink, maxHeaderBytes, maxMsgBytes)); - } catch (NumberFormatException e) { - logger.log(Level.SEVERE, "Illegal log config pattern: " + logConfig); - return null; - } - } - - /** - * Returns true if the input string is a glob of the form: {@code /*}. - */ - static boolean isServiceGlob(String input) { - return input.endsWith("/*"); - } - } - - /** - * Returns a {@link Uint128} from a CallId. - */ - static Uint128 callIdToProto(CallId callId) { - Preconditions.checkNotNull(callId); - return Uint128 - .newBuilder() - .setHigh(callId.hi) - .setLow(callId.lo) - .build(); - } - - @VisibleForTesting - // TODO(zpencer): the binlog design does not specify how to actually express the peer bytes - static Peer socketToProto(SocketAddress address) { - Preconditions.checkNotNull(address); - PeerType peerType = PeerType.UNKNOWN_PEERTYPE; - byte[] peerAddress = null; - - if (address instanceof InetSocketAddress) { - InetAddress inetAddress = ((InetSocketAddress) address).getAddress(); - if (inetAddress instanceof Inet4Address) { - peerType = PeerType.PEER_IPV4; - } else if (inetAddress instanceof Inet6Address) { - peerType = PeerType.PEER_IPV6; - } else { - logger.log(Level.SEVERE, "unknown type of InetSocketAddress: {}", address); - } - int port = ((InetSocketAddress) address).getPort(); - byte[] portBytes = new byte[IP_PORT_BYTES]; - portBytes[0] = (byte) ((port & IP_PORT_UPPER_MASK) >> 8); - portBytes[1] = (byte) (port & IP_PORT_LOWER_MASK); - peerAddress = Bytes.concat(inetAddress.getAddress(), portBytes); - } else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) { - // To avoid a compile time dependency on grpc-netty, we check against the runtime class name. - peerType = PeerType.PEER_UNIX; - } - if (peerAddress == null) { - peerAddress = address.toString().getBytes(Charset.defaultCharset()); - } - return Peer.newBuilder() - .setPeerType(peerType) - .setPeer(ByteString.copyFrom(peerAddress)) - .build(); - } - - @VisibleForTesting - static io.grpc.binarylog.Metadata metadataToProto(Metadata metadata, int maxHeaderBytes) { - Preconditions.checkNotNull(metadata); - Preconditions.checkState(maxHeaderBytes >= 0); - Builder builder = io.grpc.binarylog.Metadata.newBuilder(); - // This code is tightly coupled with Metadata's implementation - byte[][] serialized; - if (maxHeaderBytes > 0 && (serialized = InternalMetadata.serialize(metadata)) != null) { - int written = 0; - for (int i = 0; i < serialized.length && written < maxHeaderBytes; i += 2) { - byte[] key = serialized[i]; - byte[] value = serialized[i + 1]; - if (written + key.length + value.length <= maxHeaderBytes) { - builder.addEntry( - MetadataEntry - .newBuilder() - .setKey(ByteString.copyFrom(key)) - .setValue(ByteString.copyFrom(value)) - .build()); - written += key.length; - written += value.length; - } - } - } - return builder.build(); - } - - @VisibleForTesting - static Message messageToProto(byte[] message, boolean compressed, int maxMessageBytes) { - Preconditions.checkNotNull(message); - Message.Builder builder = Message - .newBuilder() - .setFlags(flagsForMessage(compressed)) - .setLength(message.length); - if (maxMessageBytes > 0) { - int desiredBytes = Math.min(maxMessageBytes, message.length); - builder.setData(ByteString.copyFrom(message, 0, desiredBytes)); - } - return builder.build(); - } - - /** - * Returns a flag based on the arguments. - */ - @VisibleForTesting - static int flagsForMessage(boolean compressed) { - return compressed ? 1 : 0; - } - - private static class DummySocketAddress extends SocketAddress { - private static final long serialVersionUID = 0; - } -} diff --git a/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java index 764f63c8c..b87a711c1 100644 --- a/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java +++ b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java @@ -32,7 +32,7 @@ import javax.annotation.Nullable; @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4017") public class BinaryLogProviderImpl extends BinaryLogProvider { private static final Logger logger = Logger.getLogger(BinaryLogProviderImpl.class.getName()); - private final BinaryLog.Factory factory; + private final BinlogHelper.Factory factory; private final AtomicLong counter = new AtomicLong(); public BinaryLogProviderImpl() { @@ -40,9 +40,9 @@ public class BinaryLogProviderImpl extends BinaryLogProvider { } BinaryLogProviderImpl(BinaryLogSink sink, String configStr) { - BinaryLog.Factory factory = null; + BinlogHelper.Factory factory = null; try { - factory = new BinaryLog.FactoryImpl(sink, configStr); + factory = new BinlogHelper.FactoryImpl(sink, configStr); } catch (RuntimeException e) { logger.log(Level.SEVERE, "Caught exception, binary log will be disabled", e); } catch (Error err) { diff --git a/services/src/main/java/io/grpc/services/BinlogHelper.java b/services/src/main/java/io/grpc/services/BinlogHelper.java new file mode 100644 index 000000000..0eae0b2eb --- /dev/null +++ b/services/src/main/java/io/grpc/services/BinlogHelper.java @@ -0,0 +1,607 @@ +/* + * Copyright 2017, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static io.grpc.internal.BinaryLogProvider.BYTEARRAY_MARSHALLER; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.primitives.Bytes; +import com.google.protobuf.ByteString; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; +import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; +import io.grpc.Grpc; +import io.grpc.InternalMetadata; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.Marshaller; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.binarylog.GrpcLogEntry; +import io.grpc.binarylog.GrpcLogEntry.Type; +import io.grpc.binarylog.Message; +import io.grpc.binarylog.Metadata.Builder; +import io.grpc.binarylog.MetadataEntry; +import io.grpc.binarylog.Peer; +import io.grpc.binarylog.Peer.PeerType; +import io.grpc.binarylog.Uint128; +import io.grpc.internal.BinaryLogProvider.CallId; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A binary log class that is configured for a specific {@link MethodDescriptor}. + */ +@ThreadSafe +final class BinlogHelper { + private static final Logger logger = Logger.getLogger(BinlogHelper.class.getName()); + private static final int IP_PORT_BYTES = 2; + private static final int IP_PORT_UPPER_MASK = 0xff00; + private static final int IP_PORT_LOWER_MASK = 0xff; + private static final boolean SERVER = true; + private static final boolean CLIENT = false; + + @VisibleForTesting + static final CallId emptyCallId = new CallId(0, 0); + @VisibleForTesting + static final SocketAddress DUMMY_SOCKET = new DummySocketAddress(); + @VisibleForTesting + static final boolean DUMMY_IS_COMPRESSED = false; + + @VisibleForTesting + final SinkWriter writer; + + @VisibleForTesting + BinlogHelper(SinkWriter writer) { + this.writer = writer; + } + + // TODO(zpencer): move proto related static helpers into this class + static final class SinkWriterImpl extends SinkWriter { + private final BinaryLogSink sink; + private final int maxHeaderBytes; + private final int maxMessageBytes; + + SinkWriterImpl(BinaryLogSink sink, int maxHeaderBytes, int maxMessageBytes) { + this.sink = sink; + this.maxHeaderBytes = maxHeaderBytes; + this.maxMessageBytes = maxMessageBytes; + } + + @Override + void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId) { + GrpcLogEntry entry = GrpcLogEntry + .newBuilder() + .setType(Type.SEND_INITIAL_METADATA) + .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) + .setCallId(callIdToProto(callId)) + .setMetadata(metadataToProto(metadata, maxHeaderBytes)) + .build(); + sink.write(entry); + } + + @Override + void logRecvInitialMetadata( + Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) { + GrpcLogEntry entry = GrpcLogEntry + .newBuilder() + .setType(Type.RECV_INITIAL_METADATA) + .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) + .setCallId(callIdToProto(callId)) + .setPeer(socketToProto(peerSocket)) + .setMetadata(metadataToProto(metadata, maxHeaderBytes)) + .build(); + sink.write(entry); + } + + @Override + void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId) { + GrpcLogEntry entry = GrpcLogEntry + .newBuilder() + .setType(isServer ? Type.SEND_TRAILING_METADATA : Type.RECV_TRAILING_METADATA) + .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) + .setCallId(callIdToProto(callId)) + .setMetadata(metadataToProto(metadata, maxHeaderBytes)) + .build(); + sink.write(entry); + } + + @Override + void logOutboundMessage( + Marshaller marshaller, + T message, + boolean compressed, + boolean isServer, + CallId callId) { + if (marshaller != BYTEARRAY_MARSHALLER) { + throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); + } + byte[] bytes = (byte[]) message; + GrpcLogEntry entry = GrpcLogEntry + .newBuilder() + .setType(Type.SEND_MESSAGE) + .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) + .setCallId(callIdToProto(callId)) + .setMessage(messageToProto(bytes, compressed, maxMessageBytes)) + .build(); + sink.write(entry); + } + + @Override + void logInboundMessage( + Marshaller marshaller, + T message, + boolean compressed, + boolean isServer, + CallId callId) { + if (marshaller != BYTEARRAY_MARSHALLER) { + throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); + } + byte[] bytes = (byte[]) message; + GrpcLogEntry entry = GrpcLogEntry + .newBuilder() + .setType(Type.RECV_MESSAGE) + .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) + .setCallId(callIdToProto(callId)) + .setMessage(messageToProto(bytes, compressed, maxMessageBytes)) + .build(); + sink.write(entry); + } + + @Override + int getMaxHeaderBytes() { + return maxHeaderBytes; + } + + @Override + int getMaxMessageBytes() { + return maxMessageBytes; + } + } + + abstract static class SinkWriter { + /** + * Logs the sending of initial metadata. This method logs the appropriate number of bytes + * as determined by the binary logging configuration. + */ + abstract void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId); + + /** + * Logs the receiving of initial metadata. This method logs the appropriate number of bytes + * as determined by the binary logging configuration. + */ + abstract void logRecvInitialMetadata( + Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket); + + /** + * Logs the trailing metadata. This method logs the appropriate number of bytes + * as determined by the binary logging configuration. + */ + abstract void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId); + + /** + * Logs the outbound message. This method logs the appropriate number of bytes from + * {@code message}, and returns a duplicate of the message. + * The number of bytes logged is determined by the binary logging configuration. + * This method takes ownership of {@code message}. + */ + abstract void logOutboundMessage( + Marshaller marshaller, T message, boolean compressed, boolean isServer, CallId callId); + + /** + * Logs the inbound message. This method logs the appropriate number of bytes from + * {@code message}, and returns a duplicate of the message. + * The number of bytes logged is determined by the binary logging configuration. + * This method takes ownership of {@code message}. + */ + abstract void logInboundMessage( + Marshaller marshaller, T message, boolean compressed, boolean isServer, CallId callId); + + /** + * Returns the number bytes of the header this writer will log, according to configuration. + */ + abstract int getMaxHeaderBytes(); + + /** + * Returns the number bytes of the message this writer will log, according to configuration. + */ + abstract int getMaxMessageBytes(); + } + + static SocketAddress getPeerSocket(Attributes streamAttributes) { + SocketAddress peer = streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + if (peer == null) { + return DUMMY_SOCKET; + } + return peer; + } + + public ClientInterceptor getClientInterceptor(final CallId callId) { + return new ClientInterceptor() { + @Override + public ClientCall interceptCall( + final MethodDescriptor method, CallOptions callOptions, Channel next) { + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + writer.logSendInitialMetadata(headers, CLIENT, callId); + ClientCall.Listener wListener = + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onMessage(RespT message) { + writer.logInboundMessage( + method.getResponseMarshaller(), + message, + DUMMY_IS_COMPRESSED, + CLIENT, + callId); + super.onMessage(message); + } + + @Override + public void onHeaders(Metadata headers) { + SocketAddress peer = getPeerSocket(getAttributes()); + writer.logRecvInitialMetadata(headers, CLIENT, callId, peer); + super.onHeaders(headers); + } + + @Override + public void onClose(Status status, Metadata trailers) { + writer.logTrailingMetadata(trailers, CLIENT, callId); + super.onClose(status, trailers); + } + }; + super.start(wListener, headers); + } + + @Override + public void sendMessage(ReqT message) { + writer.logOutboundMessage( + method.getRequestMarshaller(), + message, + DUMMY_IS_COMPRESSED, + CLIENT, + callId); + super.sendMessage(message); + } + }; + } + }; + } + + public ServerInterceptor getServerInterceptor(final CallId callId) { + return new ServerInterceptor() { + @Override + public Listener interceptCall( + final ServerCall call, + Metadata headers, + ServerCallHandler next) { + SocketAddress peer = getPeerSocket(call.getAttributes()); + writer.logRecvInitialMetadata(headers, SERVER, callId, peer); + ServerCall wCall = new SimpleForwardingServerCall(call) { + @Override + public void sendMessage(RespT message) { + writer.logOutboundMessage( + call.getMethodDescriptor().getResponseMarshaller(), + message, + DUMMY_IS_COMPRESSED, + SERVER, + callId); + super.sendMessage(message); + } + + @Override + public void sendHeaders(Metadata headers) { + writer.logSendInitialMetadata(headers, SERVER, callId); + super.sendHeaders(headers); + } + + @Override + public void close(Status status, Metadata trailers) { + writer.logTrailingMetadata(trailers, SERVER, callId); + super.close(status, trailers); + } + }; + + return new SimpleForwardingServerCallListener(next.startCall(wCall, headers)) { + @Override + public void onMessage(ReqT message) { + writer.logInboundMessage( + call.getMethodDescriptor().getRequestMarshaller(), + message, + DUMMY_IS_COMPRESSED, + SERVER, + callId); + super.onMessage(message); + } + }; + } + }; + } + + interface Factory { + @Nullable + BinlogHelper getLog(String fullMethodName); + } + + static final class FactoryImpl implements Factory { + // '*' for global, 'service/*' for service glob, or 'service/method' for fully qualified. + private static final Pattern logPatternRe = Pattern.compile("[^{]+"); + // A curly brace wrapped expression. Will be further matched with the more specified REs below. + private static final Pattern logOptionsRe = Pattern.compile("\\{[^}]+}"); + private static final Pattern configRe = Pattern.compile( + String.format("^(%s)(%s)?$", logPatternRe.pattern(), logOptionsRe.pattern())); + // Regexes to extract per-binlog options + // The form: {m:256} + private static final Pattern msgRe = Pattern.compile("\\{m(?::(\\d+))?}"); + // The form: {h:256} + private static final Pattern headerRe = Pattern.compile("\\{h(?::(\\d+))?}"); + // The form: {h:256,m:256} + private static final Pattern bothRe = Pattern.compile("\\{h(?::(\\d+))?;m(?::(\\d+))?}"); + + private final BinlogHelper globalLog; + private final Map perServiceLogs; + private final Map perMethodLogs; + + /** + * Accepts a string in the format specified by the binary log spec. + */ + @VisibleForTesting + FactoryImpl(BinaryLogSink sink, String configurationString) { + Preconditions.checkNotNull(sink); + Preconditions.checkState(configurationString != null && configurationString.length() > 0); + BinlogHelper globalLog = null; + Map perServiceLogs = new HashMap(); + Map perMethodLogs = new HashMap(); + + for (String configuration : Splitter.on(',').split(configurationString)) { + Matcher configMatcher = configRe.matcher(configuration); + if (!configMatcher.matches()) { + throw new IllegalArgumentException("Bad input: " + configuration); + } + String methodOrSvc = configMatcher.group(1); + String binlogOptionStr = configMatcher.group(2); + BinlogHelper binLog = createBinaryLog(sink, binlogOptionStr); + if (binLog == null) { + continue; + } + if (methodOrSvc.equals("*")) { + if (globalLog != null) { + logger.log(Level.SEVERE, "Ignoring duplicate entry: " + configuration); + continue; + } + globalLog = binLog; + logger.info("Global binlog: " + globalLog); + } else if (isServiceGlob(methodOrSvc)) { + String service = MethodDescriptor.extractFullServiceName(methodOrSvc); + if (perServiceLogs.containsKey(service)) { + logger.log(Level.SEVERE, "Ignoring duplicate entry: " + configuration); + continue; + } + perServiceLogs.put(service, binLog); + logger.info(String.format("Service binlog: service=%s log=%s", service, binLog)); + } else { + // assume fully qualified method name + if (perMethodLogs.containsKey(methodOrSvc)) { + logger.log(Level.SEVERE, "Ignoring duplicate entry: " + configuration); + continue; + } + perMethodLogs.put(methodOrSvc, binLog); + logger.info(String.format("Method binlog: method=%s log=%s", methodOrSvc, binLog)); + } + } + this.globalLog = globalLog; + this.perServiceLogs = Collections.unmodifiableMap(perServiceLogs); + this.perMethodLogs = Collections.unmodifiableMap(perMethodLogs); + } + + /** + * Accepts a full method name and returns the log that should be used. + */ + @Override + public BinlogHelper getLog(String fullMethodName) { + BinlogHelper methodLog = perMethodLogs.get(fullMethodName); + if (methodLog != null) { + return methodLog; + } + BinlogHelper serviceLog = perServiceLogs.get( + MethodDescriptor.extractFullServiceName(fullMethodName)); + if (serviceLog != null) { + return serviceLog; + } + return globalLog; + } + + /** + * Returns a binlog with the correct header and message limits or {@code null} if the input + * is malformed. The input should be a string that is in one of these forms: + * + *

{@code {h(:\d+)?}, {m(:\d+)?}, {h(:\d+)?,m(:\d+)?}} + * + *

If the {@code logConfig} is null, the returned binlog will have a limit of + * Integer.MAX_VALUE. + */ + @VisibleForTesting + @Nullable + static BinlogHelper createBinaryLog(BinaryLogSink sink, @Nullable String logConfig) { + if (logConfig == null) { + return new BinlogHelper( + new SinkWriterImpl(sink, Integer.MAX_VALUE, Integer.MAX_VALUE)); + } + try { + Matcher headerMatcher; + Matcher msgMatcher; + Matcher bothMatcher; + final int maxHeaderBytes; + final int maxMsgBytes; + if ((headerMatcher = headerRe.matcher(logConfig)).matches()) { + String maxHeaderStr = headerMatcher.group(1); + maxHeaderBytes = + maxHeaderStr != null ? Integer.parseInt(maxHeaderStr) : Integer.MAX_VALUE; + maxMsgBytes = 0; + } else if ((msgMatcher = msgRe.matcher(logConfig)).matches()) { + maxHeaderBytes = 0; + String maxMsgStr = msgMatcher.group(1); + maxMsgBytes = maxMsgStr != null ? Integer.parseInt(maxMsgStr) : Integer.MAX_VALUE; + } else if ((bothMatcher = bothRe.matcher(logConfig)).matches()) { + String maxHeaderStr = bothMatcher.group(1); + String maxMsgStr = bothMatcher.group(2); + maxHeaderBytes = + maxHeaderStr != null ? Integer.parseInt(maxHeaderStr) : Integer.MAX_VALUE; + maxMsgBytes = maxMsgStr != null ? Integer.parseInt(maxMsgStr) : Integer.MAX_VALUE; + } else { + logger.log(Level.SEVERE, "Illegal log config pattern: " + logConfig); + return null; + } + return new BinlogHelper(new SinkWriterImpl(sink, maxHeaderBytes, maxMsgBytes)); + } catch (NumberFormatException e) { + logger.log(Level.SEVERE, "Illegal log config pattern: " + logConfig); + return null; + } + } + + /** + * Returns true if the input string is a glob of the form: {@code /*}. + */ + static boolean isServiceGlob(String input) { + return input.endsWith("/*"); + } + } + + /** + * Returns a {@link Uint128} from a CallId. + */ + static Uint128 callIdToProto(CallId callId) { + Preconditions.checkNotNull(callId); + return Uint128 + .newBuilder() + .setHigh(callId.hi) + .setLow(callId.lo) + .build(); + } + + @VisibleForTesting + // TODO(zpencer): the binlog design does not specify how to actually express the peer bytes + static Peer socketToProto(SocketAddress address) { + Preconditions.checkNotNull(address); + PeerType peerType = PeerType.UNKNOWN_PEERTYPE; + byte[] peerAddress = null; + + if (address instanceof InetSocketAddress) { + InetAddress inetAddress = ((InetSocketAddress) address).getAddress(); + if (inetAddress instanceof Inet4Address) { + peerType = PeerType.PEER_IPV4; + } else if (inetAddress instanceof Inet6Address) { + peerType = PeerType.PEER_IPV6; + } else { + logger.log(Level.SEVERE, "unknown type of InetSocketAddress: {}", address); + } + int port = ((InetSocketAddress) address).getPort(); + byte[] portBytes = new byte[IP_PORT_BYTES]; + portBytes[0] = (byte) ((port & IP_PORT_UPPER_MASK) >> 8); + portBytes[1] = (byte) (port & IP_PORT_LOWER_MASK); + peerAddress = Bytes.concat(inetAddress.getAddress(), portBytes); + } else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) { + // To avoid a compile time dependency on grpc-netty, we check against the runtime class name. + peerType = PeerType.PEER_UNIX; + } + if (peerAddress == null) { + peerAddress = address.toString().getBytes(Charset.defaultCharset()); + } + return Peer.newBuilder() + .setPeerType(peerType) + .setPeer(ByteString.copyFrom(peerAddress)) + .build(); + } + + @VisibleForTesting + static io.grpc.binarylog.Metadata metadataToProto(Metadata metadata, int maxHeaderBytes) { + Preconditions.checkNotNull(metadata); + Preconditions.checkState(maxHeaderBytes >= 0); + Builder builder = io.grpc.binarylog.Metadata.newBuilder(); + // This code is tightly coupled with Metadata's implementation + byte[][] serialized; + if (maxHeaderBytes > 0 && (serialized = InternalMetadata.serialize(metadata)) != null) { + int written = 0; + for (int i = 0; i < serialized.length && written < maxHeaderBytes; i += 2) { + byte[] key = serialized[i]; + byte[] value = serialized[i + 1]; + if (written + key.length + value.length <= maxHeaderBytes) { + builder.addEntry( + MetadataEntry + .newBuilder() + .setKey(ByteString.copyFrom(key)) + .setValue(ByteString.copyFrom(value)) + .build()); + written += key.length; + written += value.length; + } + } + } + return builder.build(); + } + + @VisibleForTesting + static Message messageToProto(byte[] message, boolean compressed, int maxMessageBytes) { + Preconditions.checkNotNull(message); + Message.Builder builder = Message + .newBuilder() + .setFlags(flagsForMessage(compressed)) + .setLength(message.length); + if (maxMessageBytes > 0) { + int desiredBytes = Math.min(maxMessageBytes, message.length); + builder.setData(ByteString.copyFrom(message, 0, desiredBytes)); + } + return builder.build(); + } + + /** + * Returns a flag based on the arguments. + */ + @VisibleForTesting + static int flagsForMessage(boolean compressed) { + return compressed ? 1 : 0; + } + + private static class DummySocketAddress extends SocketAddress { + private static final long serialVersionUID = 0; + } +} diff --git a/services/src/test/java/io/grpc/services/BinaryLogTest.java b/services/src/test/java/io/grpc/services/BinaryLogTest.java deleted file mode 100644 index 235f52073..000000000 --- a/services/src/test/java/io/grpc/services/BinaryLogTest.java +++ /dev/null @@ -1,974 +0,0 @@ -/* - * Copyright 2017, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.services; - -import static io.grpc.internal.BinaryLogProvider.BYTEARRAY_MARSHALLER; -import static io.grpc.services.BinaryLog.DUMMY_SOCKET; -import static io.grpc.services.BinaryLog.getPeerSocket; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; - -import com.google.common.primitives.Bytes; -import com.google.protobuf.ByteString; -import io.grpc.Attributes; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.Grpc; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.MethodDescriptor.MethodType; -import io.grpc.ServerCall; -import io.grpc.ServerCallHandler; -import io.grpc.Status; -import io.grpc.binarylog.GrpcLogEntry; -import io.grpc.binarylog.Message; -import io.grpc.binarylog.MetadataEntry; -import io.grpc.binarylog.Peer; -import io.grpc.binarylog.Peer.PeerType; -import io.grpc.binarylog.Uint128; -import io.grpc.internal.BinaryLogProvider; -import io.grpc.internal.BinaryLogProvider.CallId; -import io.grpc.internal.NoopClientCall; -import io.grpc.internal.NoopServerCall; -import io.grpc.services.BinaryLog.FactoryImpl; -import io.grpc.services.BinaryLog.SinkWriter; -import io.grpc.services.BinaryLog.SinkWriterImpl; -import io.netty.channel.unix.DomainSocketAddress; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.concurrent.atomic.AtomicReference; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link BinaryLog}. */ -@RunWith(JUnit4.class) -public final class BinaryLogTest { - private static final Charset US_ASCII = Charset.forName("US-ASCII"); - private static final BinaryLog HEADER_FULL = new Builder().header(Integer.MAX_VALUE).build(); - private static final BinaryLog HEADER_256 = new Builder().header(256).build(); - private static final BinaryLog MSG_FULL = new Builder().msg(Integer.MAX_VALUE).build(); - private static final BinaryLog MSG_256 = new Builder().msg(256).build(); - private static final BinaryLog BOTH_256 = new Builder().header(256).msg(256).build(); - private static final BinaryLog BOTH_FULL = - new Builder().header(Integer.MAX_VALUE).msg(Integer.MAX_VALUE).build(); - - private static final String DATA_A = "aaaaaaaaa"; - private static final String DATA_B = "bbbbbbbbb"; - private static final String DATA_C = "ccccccccc"; - private static final Metadata.Key KEY_A = - Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER); - private static final Metadata.Key KEY_B = - Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER); - private static final Metadata.Key KEY_C = - Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER); - private static final MetadataEntry ENTRY_A = - MetadataEntry - .newBuilder() - .setKey(ByteString.copyFrom(KEY_A.name(), US_ASCII)) - .setValue(ByteString.copyFrom(DATA_A.getBytes(US_ASCII))) - .build(); - private static final MetadataEntry ENTRY_B = - MetadataEntry - .newBuilder() - .setKey(ByteString.copyFrom(KEY_B.name(), US_ASCII)) - .setValue(ByteString.copyFrom(DATA_B.getBytes(US_ASCII))) - .build(); - private static final MetadataEntry ENTRY_C = - MetadataEntry - .newBuilder() - .setKey(ByteString.copyFrom(KEY_C.name(), US_ASCII)) - .setValue(ByteString.copyFrom(DATA_C.getBytes(US_ASCII))) - .build(); - private static final boolean IS_SERVER = true; - private static final boolean IS_CLIENT = false; - private static final boolean IS_COMPRESSED = true; - private static final boolean IS_UNCOMPRESSED = false; - // TODO(zpencer): rename this to callId, since byte[] is mutable - private static final CallId CALL_ID = - new CallId(0x1112131415161718L, 0x19101a1b1c1d1e1fL); - private static final int HEADER_LIMIT = 10; - private static final int MESSAGE_LIMIT = Integer.MAX_VALUE; - - private final Metadata nonEmptyMetadata = new Metadata(); - private final BinaryLogSink sink = mock(BinaryLogSink.class); - private final SinkWriter sinkWriterImpl = - new SinkWriterImpl(sink, HEADER_LIMIT, MESSAGE_LIMIT); - private final SinkWriter mockSinkWriter = mock(SinkWriter.class); - private final byte[] message = new byte[100]; - private SocketAddress peer; - - @Before - public void setUp() throws Exception { - nonEmptyMetadata.put(KEY_A, DATA_A); - nonEmptyMetadata.put(KEY_B, DATA_B); - nonEmptyMetadata.put(KEY_C, DATA_C); - peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234); - } - - @Test - public void configBinLog_global() throws Exception { - assertSameLimits(BOTH_FULL, makeLog("*", "p.s/m")); - assertSameLimits(BOTH_FULL, makeLog("*{h;m}", "p.s/m")); - assertSameLimits(HEADER_FULL, makeLog("*{h}", "p.s/m")); - assertSameLimits(MSG_FULL, makeLog("*{m}", "p.s/m")); - assertSameLimits(HEADER_256, makeLog("*{h:256}", "p.s/m")); - assertSameLimits(MSG_256, makeLog("*{m:256}", "p.s/m")); - assertSameLimits(BOTH_256, makeLog("*{h:256;m:256}", "p.s/m")); - assertSameLimits( - new Builder().header(Integer.MAX_VALUE).msg(256).build(), - makeLog("*{h;m:256}", "p.s/m")); - assertSameLimits( - new Builder().header(256).msg(Integer.MAX_VALUE).build(), - makeLog("*{h:256;m}", "p.s/m")); - } - - @Test - public void configBinLog_method() throws Exception { - assertSameLimits(BOTH_FULL, makeLog("p.s/m", "p.s/m")); - assertSameLimits(BOTH_FULL, makeLog("p.s/m{h;m}", "p.s/m")); - assertSameLimits(HEADER_FULL, makeLog("p.s/m{h}", "p.s/m")); - assertSameLimits(MSG_FULL, makeLog("p.s/m{m}", "p.s/m")); - assertSameLimits(HEADER_256, makeLog("p.s/m{h:256}", "p.s/m")); - assertSameLimits(MSG_256, makeLog("p.s/m{m:256}", "p.s/m")); - assertSameLimits(BOTH_256, makeLog("p.s/m{h:256;m:256}", "p.s/m")); - assertSameLimits( - new Builder().header(Integer.MAX_VALUE).msg(256).build(), - makeLog("p.s/m{h;m:256}", "p.s/m")); - assertSameLimits( - new Builder().header(256).msg(Integer.MAX_VALUE).build(), - makeLog("p.s/m{h:256;m}", "p.s/m")); - } - - @Test - public void configBinLog_method_absent() throws Exception { - assertNull(makeLog("p.s/m", "p.s/absent")); - } - - @Test - public void configBinLog_service() throws Exception { - assertSameLimits(BOTH_FULL, makeLog("p.s/*", "p.s/m")); - assertSameLimits(BOTH_FULL, makeLog("p.s/*{h;m}", "p.s/m")); - assertSameLimits(HEADER_FULL, makeLog("p.s/*{h}", "p.s/m")); - assertSameLimits(MSG_FULL, makeLog("p.s/*{m}", "p.s/m")); - assertSameLimits(HEADER_256, makeLog("p.s/*{h:256}", "p.s/m")); - assertSameLimits(MSG_256, makeLog("p.s/*{m:256}", "p.s/m")); - assertSameLimits(BOTH_256, makeLog("p.s/*{h:256;m:256}", "p.s/m")); - assertSameLimits( - new Builder().header(Integer.MAX_VALUE).msg(256).build(), - makeLog("p.s/*{h;m:256}", "p.s/m")); - assertSameLimits( - new Builder().header(256).msg(Integer.MAX_VALUE).build(), - makeLog("p.s/*{h:256;m}", "p.s/m")); - } - - @Test - public void configBinLog_service_absent() throws Exception { - assertNull(makeLog("p.s/*", "p.other/m")); - } - - @Test - public void createLogFromOptionString() throws Exception { - assertSameLimits(BOTH_FULL, makeLog(null)); - assertSameLimits(HEADER_FULL, makeLog("{h}")); - assertSameLimits(MSG_FULL, makeLog("{m}")); - assertSameLimits(HEADER_256, makeLog("{h:256}")); - assertSameLimits(MSG_256, makeLog("{m:256}")); - assertSameLimits(BOTH_256, makeLog("{h:256;m:256}")); - assertSameLimits( - new Builder().header(Integer.MAX_VALUE).msg(256).build(), - makeLog("{h;m:256}")); - assertSameLimits( - new Builder().header(256).msg(Integer.MAX_VALUE).build(), - makeLog("{h:256;m}")); - } - - @Test - public void createLogFromOptionString_malformed() throws Exception { - assertNull(makeLog("bad")); - assertNull(makeLog("{bad}")); - assertNull(makeLog("{x;y}")); - assertNull(makeLog("{h:abc}")); - assertNull(makeLog("{2}")); - assertNull(makeLog("{2;2}")); - // The grammar specifies that if both h and m are present, h comes before m - assertNull(makeLog("{m:123;h:123}")); - // NumberFormatException - assertNull(makeLog("{h:99999999999999}")); - } - - @Test - public void configBinLog_multiConfig_withGlobal() throws Exception { - String configStr = - "*{h}," - + "package.both256/*{h:256;m:256}," - + "package.service1/both128{h:128;m:128}," - + "package.service2/method_messageOnly{m}"; - assertSameLimits(HEADER_FULL, makeLog(configStr, "otherpackage.service/method")); - - assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method1")); - assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method2")); - assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method3")); - - assertSameLimits( - new Builder().header(128).msg(128).build(), makeLog(configStr, "package.service1/both128")); - // the global config is in effect - assertSameLimits(HEADER_FULL, makeLog(configStr, "package.service1/absent")); - - assertSameLimits(MSG_FULL, makeLog(configStr, "package.service2/method_messageOnly")); - // the global config is in effect - assertSameLimits(HEADER_FULL, makeLog(configStr, "package.service2/absent")); - } - - @Test - public void configBinLog_multiConfig_noGlobal() throws Exception { - String configStr = - "package.both256/*{h:256;m:256}," - + "package.service1/both128{h:128;m:128}," - + "package.service2/method_messageOnly{m}"; - assertNull(makeLog(configStr, "otherpackage.service/method")); - - assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method1")); - assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method2")); - assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method3")); - - assertSameLimits( - new Builder().header(128).msg(128).build(), makeLog(configStr, "package.service1/both128")); - // no global config in effect - assertNull(makeLog(configStr, "package.service1/absent")); - - assertSameLimits(MSG_FULL, makeLog(configStr, "package.service2/method_messageOnly")); - // no global config in effect - assertNull(makeLog(configStr, "package.service2/absent")); - } - - @Test - public void configBinLog_ignoreDuplicates_global() throws Exception { - String configStr = "*{h},p.s/m,*{h:256}"; - // The duplicate - assertSameLimits(HEADER_FULL, makeLog(configStr, "p.other1/m")); - assertSameLimits(HEADER_FULL, makeLog(configStr, "p.other2/m")); - // Other - assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m")); - } - - @Test - public void configBinLog_ignoreDuplicates_service() throws Exception { - String configStr = "p.s/*,*{h:256},p.s/*{h}"; - // The duplicate - assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m1")); - assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m2")); - // Other - assertSameLimits(HEADER_256, makeLog(configStr, "p.other1/m")); - assertSameLimits(HEADER_256, makeLog(configStr, "p.other2/m")); - } - - @Test - public void configBinLog_ignoreDuplicates_method() throws Exception { - String configStr = "p.s/m,*{h:256},p.s/m{h}"; - // The duplicate - assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m")); - // Other - assertSameLimits(HEADER_256, makeLog(configStr, "p.other1/m")); - assertSameLimits(HEADER_256, makeLog(configStr, "p.other2/m")); - } - - @Test - public void callIdToProto() { - CallId callId = new CallId(0x1112131415161718L, 0x19101a1b1c1d1e1fL); - assertEquals( - Uint128 - .newBuilder() - .setHigh(0x1112131415161718L) - .setLow(0x19101a1b1c1d1e1fL) - .build(), - BinaryLog.callIdToProto(callId)); - } - - @Test - public void socketToProto_ipv4() throws Exception { - InetAddress address = InetAddress.getByName("127.0.0.1"); - int port = 12345; - InetSocketAddress socketAddress = new InetSocketAddress(address, port); - byte[] addressBytes = address.getAddress(); - byte[] portBytes = ByteBuffer.allocate(4).putInt(port).array(); - byte[] portUnsignedBytes = Arrays.copyOfRange(portBytes, 2, 4); - assertEquals( - Peer - .newBuilder() - .setPeerType(Peer.PeerType.PEER_IPV4) - .setPeer(ByteString.copyFrom(Bytes.concat(addressBytes, portUnsignedBytes))) - .build(), - BinaryLog.socketToProto(socketAddress)); - } - - @Test - public void socketToProto_ipv6() throws Exception { - // this is a ipv6 link local address - InetAddress address = InetAddress.getByName("fe:80:12:34:56:78:90:ab"); - int port = 12345; - InetSocketAddress socketAddress = new InetSocketAddress(address, port); - byte[] addressBytes = address.getAddress(); - byte[] portBytes = ByteBuffer.allocate(4).putInt(port).array(); - byte[] portUnsignedBytes = Arrays.copyOfRange(portBytes, 2, 4); - assertEquals( - Peer - .newBuilder() - .setPeerType(Peer.PeerType.PEER_IPV6) - .setPeer(ByteString.copyFrom(Bytes.concat(addressBytes, portUnsignedBytes))) - .build(), - BinaryLog.socketToProto(socketAddress)); - } - - @Test - public void socketToProto_unix() throws Exception { - String path = "/some/path"; - DomainSocketAddress socketAddress = new DomainSocketAddress(path); - assertEquals( - Peer - .newBuilder() - .setPeerType(Peer.PeerType.PEER_UNIX) - .setPeer(ByteString.copyFrom(path.getBytes(US_ASCII))) - .build(), - BinaryLog.socketToProto(socketAddress) - ); - } - - @Test - public void socketToProto_unknown() throws Exception { - SocketAddress unknownSocket = new SocketAddress() { }; - assertEquals( - Peer.newBuilder() - .setPeerType(PeerType.UNKNOWN_PEERTYPE) - .setPeer(ByteString.copyFrom(unknownSocket.toString(), US_ASCII)) - .build(), - BinaryLog.socketToProto(unknownSocket)); - } - - @Test - public void metadataToProto_empty() throws Exception { - assertEquals( - io.grpc.binarylog.Metadata.getDefaultInstance(), - BinaryLog.metadataToProto(new Metadata(), Integer.MAX_VALUE)); - } - - @Test - public void metadataToProto() throws Exception { - assertEquals( - io.grpc.binarylog.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .addEntry(ENTRY_B) - .addEntry(ENTRY_C) - .build(), - BinaryLog.metadataToProto(nonEmptyMetadata, Integer.MAX_VALUE)); - } - - @Test - public void metadataToProto_truncated() throws Exception { - // 0 byte limit not enough for any metadata - assertEquals( - io.grpc.binarylog.Metadata.getDefaultInstance(), - BinaryLog.metadataToProto(nonEmptyMetadata, 0)); - // not enough bytes for first key value - assertEquals( - io.grpc.binarylog.Metadata.getDefaultInstance(), - BinaryLog.metadataToProto(nonEmptyMetadata, 9)); - // enough for first key value - assertEquals( - io.grpc.binarylog.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .build(), - BinaryLog.metadataToProto(nonEmptyMetadata, 10)); - // Test edge cases for >= 2 key values - assertEquals( - io.grpc.binarylog.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .build(), - BinaryLog.metadataToProto(nonEmptyMetadata, 19)); - assertEquals( - io.grpc.binarylog.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .addEntry(ENTRY_B) - .build(), - BinaryLog.metadataToProto(nonEmptyMetadata, 20)); - assertEquals( - io.grpc.binarylog.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .addEntry(ENTRY_B) - .build(), - BinaryLog.metadataToProto(nonEmptyMetadata, 29)); - assertEquals( - io.grpc.binarylog.Metadata - .newBuilder() - .addEntry(ENTRY_A) - .addEntry(ENTRY_B) - .addEntry(ENTRY_C) - .build(), - BinaryLog.metadataToProto(nonEmptyMetadata, 30)); - } - - @Test - public void messageToProto() throws Exception { - byte[] bytes - = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII); - Message message = BinaryLog.messageToProto(bytes, false, Integer.MAX_VALUE); - assertEquals( - Message - .newBuilder() - .setData(ByteString.copyFrom(bytes)) - .setFlags(0) - .setLength(bytes.length) - .build(), - message); - } - - @Test - public void messageToProto_truncated() throws Exception { - byte[] bytes - = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII); - assertEquals( - Message - .newBuilder() - .setFlags(0) - .setLength(bytes.length) - .build(), - BinaryLog.messageToProto(bytes, false, 0)); - - int limit = 10; - String truncatedMessage = "this is a "; - assertEquals( - Message - .newBuilder() - .setData(ByteString.copyFrom(truncatedMessage.getBytes(US_ASCII))) - .setFlags(0) - .setLength(bytes.length) - .build(), - BinaryLog.messageToProto(bytes, false, limit)); - } - - @Test - public void toFlag() throws Exception { - assertEquals(0, BinaryLog.flagsForMessage(IS_UNCOMPRESSED)); - assertEquals(1, BinaryLog.flagsForMessage(IS_COMPRESSED)); - } - - @Test - public void logSendInitialMetadata_server() throws Exception { - sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) - .setLogger(GrpcLogEntry.Logger.SERVER) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10)) - .build()); - } - - @Test - public void logSendInitialMetadata_client() throws Exception { - sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) - .setLogger(GrpcLogEntry.Logger.CLIENT) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10)) - .build()); - } - - @Test - public void logRecvInitialMetadata_server() throws Exception { - InetAddress address = InetAddress.getByName("127.0.0.1"); - int port = 12345; - InetSocketAddress socketAddress = new InetSocketAddress(address, port); - sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) - .setLogger(GrpcLogEntry.Logger.SERVER) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setPeer(BinaryLog.socketToProto(socketAddress)) - .setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10)) - .build()); - } - - @Test - public void logRecvInitialMetadata_client() throws Exception { - InetAddress address = InetAddress.getByName("127.0.0.1"); - int port = 12345; - InetSocketAddress socketAddress = new InetSocketAddress(address, port); - sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) - .setLogger(GrpcLogEntry.Logger.CLIENT) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setPeer(BinaryLog.socketToProto(socketAddress)) - .setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10)) - .build()); - } - - @Test - public void logTrailingMetadata_server() throws Exception { - sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.SEND_TRAILING_METADATA) - .setLogger(GrpcLogEntry.Logger.SERVER) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10)) - .build()); - } - - @Test - public void logTrailingMetadata_client() throws Exception { - sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.RECV_TRAILING_METADATA) - .setLogger(GrpcLogEntry.Logger.CLIENT) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10)) - .build()); - } - - @Test - public void logOutboundMessage_server() throws Exception { - sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.SEND_MESSAGE) - .setLogger(GrpcLogEntry.Logger.SERVER) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) - .build()); - - sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.SEND_MESSAGE) - .setLogger(GrpcLogEntry.Logger.SERVER) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMessage( - BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) - .build()); - verifyNoMoreInteractions(sink); - } - - @Test - public void logOutboundMessage_client() throws Exception { - sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.SEND_MESSAGE) - .setLogger(GrpcLogEntry.Logger.CLIENT) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) - .build()); - - sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.SEND_MESSAGE) - .setLogger(GrpcLogEntry.Logger.CLIENT) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMessage( - BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) - .build()); - verifyNoMoreInteractions(sink); - } - - @Test - public void logInboundMessage_server() throws Exception { - sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.RECV_MESSAGE) - .setLogger(GrpcLogEntry.Logger.SERVER) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) - .build()); - - sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.RECV_MESSAGE) - .setLogger(GrpcLogEntry.Logger.SERVER) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMessage( - BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) - .build()); - verifyNoMoreInteractions(sink); - } - - @Test - public void logInboundMessage_client() throws Exception { - sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.RECV_MESSAGE) - .setLogger(GrpcLogEntry.Logger.CLIENT) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMessage(BinaryLog.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) - .build()); - - sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); - verify(sink).write( - GrpcLogEntry - .newBuilder() - .setType(GrpcLogEntry.Type.RECV_MESSAGE) - .setLogger(GrpcLogEntry.Logger.CLIENT) - .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setMessage( - BinaryLog.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) - .build()); - verifyNoMoreInteractions(sink); - } - - @Test - public void getPeerSocketTest() { - assertSame(DUMMY_SOCKET, getPeerSocket(Attributes.EMPTY)); - assertSame( - peer, - getPeerSocket(Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build())); - } - - @Test - @SuppressWarnings({"rawtypes", "unchecked"}) - public void clientInterceptor() throws Exception { - final AtomicReference interceptedListener = - new AtomicReference(); - // capture these manually because ClientCall can not be mocked - final AtomicReference actualClientInitial = new AtomicReference(); - final AtomicReference actualRequest = new AtomicReference(); - - Channel channel = new Channel() { - @Override - public ClientCall newCall( - MethodDescriptor methodDescriptor, CallOptions callOptions) { - return new NoopClientCall() { - @Override - public void start(Listener responseListener, Metadata headers) { - interceptedListener.set(responseListener); - actualClientInitial.set(headers); - } - - @Override - public void sendMessage(RequestT message) { - actualRequest.set(message); - } - - @Override - public Attributes getAttributes() { - return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build(); - } - }; - } - - @Override - public String authority() { - throw new UnsupportedOperationException(); - } - }; - - ClientCall.Listener mockListener = mock(ClientCall.Listener.class); - - MethodDescriptor method = - MethodDescriptor.newBuilder() - .setType(MethodType.UNKNOWN) - .setFullMethodName("service/method") - .setRequestMarshaller(BYTEARRAY_MARSHALLER) - .setResponseMarshaller(BYTEARRAY_MARSHALLER) - .build(); - ClientCall interceptedCall = - new BinaryLog(mockSinkWriter) - .getClientInterceptor(CALL_ID) - .interceptCall( - method, - CallOptions.DEFAULT.withOption( - BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, CALL_ID), - channel); - - // send initial metadata - { - Metadata clientInitial = new Metadata(); - interceptedCall.start(mockListener, clientInitial); - verify(mockSinkWriter).logSendInitialMetadata( - same(clientInitial), - eq(IS_CLIENT), - same(CALL_ID)); - verifyNoMoreInteractions(mockSinkWriter); - assertSame(clientInitial, actualClientInitial.get()); - } - - // receive initial metadata - { - Metadata serverInitial = new Metadata(); - interceptedListener.get().onHeaders(serverInitial); - verify(mockSinkWriter).logRecvInitialMetadata(same(serverInitial), - eq(IS_CLIENT), - same(CALL_ID), - same(peer)); - verifyNoMoreInteractions(mockSinkWriter); - verify(mockListener).onHeaders(same(serverInitial)); - } - - // send request - { - byte[] request = "this is a request".getBytes(US_ASCII); - interceptedCall.sendMessage(request); - verify(mockSinkWriter).logOutboundMessage( - same(BYTEARRAY_MARSHALLER), - same(request), - eq(BinaryLog.DUMMY_IS_COMPRESSED), - eq(IS_CLIENT), - same(CALL_ID)); - verifyNoMoreInteractions(mockSinkWriter); - assertSame(request, actualRequest.get()); - } - - // receive response - { - byte[] response = "this is a response".getBytes(US_ASCII); - interceptedListener.get().onMessage(response); - verify(mockSinkWriter).logInboundMessage( - same(BYTEARRAY_MARSHALLER), - eq(response), - eq(BinaryLog.DUMMY_IS_COMPRESSED), - eq(IS_CLIENT), - same(CALL_ID)); - verifyNoMoreInteractions(mockSinkWriter); - verify(mockListener).onMessage(same(response)); - } - - // receive trailers - { - Status status = Status.INTERNAL.withDescription("some description"); - Metadata trailers = new Metadata(); - - interceptedListener.get().onClose(status, trailers); - verify(mockSinkWriter).logTrailingMetadata( - same(trailers), - eq(IS_CLIENT), - same(CALL_ID)); - verifyNoMoreInteractions(mockSinkWriter); - verify(mockListener).onClose(same(status), same(trailers)); - } - } - - @Test - @SuppressWarnings({"rawtypes", "unchecked"}) - public void serverInterceptor() throws Exception { - final AtomicReference interceptedCall = - new AtomicReference(); - ServerCall.Listener capturedListener; - final ServerCall.Listener mockListener = mock(ServerCall.Listener.class); - // capture these manually because ServerCall can not be mocked - final AtomicReference actualServerInitial = new AtomicReference(); - final AtomicReference actualResponse = new AtomicReference(); - final AtomicReference actualStatus = new AtomicReference(); - final AtomicReference actualTrailers = new AtomicReference(); - - // begin call and receive initial metadata - { - Metadata clientInitial = new Metadata(); - final MethodDescriptor method = - MethodDescriptor.newBuilder() - .setType(MethodType.UNKNOWN) - .setFullMethodName("service/method") - .setRequestMarshaller(BYTEARRAY_MARSHALLER) - .setResponseMarshaller(BYTEARRAY_MARSHALLER) - .build(); - capturedListener = - new BinaryLog(mockSinkWriter) - .getServerInterceptor(CALL_ID) - .interceptCall( - new NoopServerCall() { - @Override - public void sendHeaders(Metadata headers) { - actualServerInitial.set(headers); - } - - @Override - public void sendMessage(byte[] message) { - actualResponse.set(message); - } - - @Override - public void close(Status status, Metadata trailers) { - actualStatus.set(status); - actualTrailers.set(trailers); - } - - @Override - public MethodDescriptor getMethodDescriptor() { - return method; - } - - @Override - public Attributes getAttributes() { - return Attributes - .newBuilder() - .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer) - .build(); - } - }, - clientInitial, - new ServerCallHandler() { - @Override - public ServerCall.Listener startCall( - ServerCall call, - Metadata headers) { - interceptedCall.set(call); - return mockListener; - } - }); - verify(mockSinkWriter).logRecvInitialMetadata( - same(clientInitial), - eq(IS_SERVER), - same(CALL_ID), - same(peer)); - verifyNoMoreInteractions(mockSinkWriter); - } - - // send initial metadata - { - Metadata serverInital = new Metadata(); - interceptedCall.get().sendHeaders(serverInital); - verify(mockSinkWriter).logSendInitialMetadata( - same(serverInital), - eq(IS_SERVER), - same(CALL_ID)); - verifyNoMoreInteractions(mockSinkWriter); - assertSame(serverInital, actualServerInitial.get()); - } - - // receive request - { - byte[] request = "this is a request".getBytes(US_ASCII); - capturedListener.onMessage(request); - verify(mockSinkWriter).logInboundMessage( - same(BYTEARRAY_MARSHALLER), - same(request), - eq(BinaryLog.DUMMY_IS_COMPRESSED), - eq(IS_SERVER), - same(CALL_ID)); - verifyNoMoreInteractions(mockSinkWriter); - verify(mockListener).onMessage(same(request)); - } - - // send response - { - byte[] response = "this is a response".getBytes(US_ASCII); - interceptedCall.get().sendMessage(response); - verify(mockSinkWriter).logOutboundMessage( - same(BYTEARRAY_MARSHALLER), - same(response), - eq(BinaryLog.DUMMY_IS_COMPRESSED), - eq(IS_SERVER), - same(CALL_ID)); - verifyNoMoreInteractions(mockSinkWriter); - assertSame(response, actualResponse.get()); - } - - // send trailers - { - Status status = Status.INTERNAL.withDescription("some description"); - Metadata trailers = new Metadata(); - interceptedCall.get().close(status, trailers); - verify(mockSinkWriter).logTrailingMetadata( - same(trailers), - eq(IS_SERVER), - same(CALL_ID)); - verifyNoMoreInteractions(mockSinkWriter); - assertSame(status, actualStatus.get()); - assertSame(trailers, actualTrailers.get()); - } - } - - /** A builder class to make unit test code more readable. */ - private static final class Builder { - int maxHeaderBytes = 0; - int maxMessageBytes = 0; - - Builder header(int bytes) { - maxHeaderBytes = bytes; - return this; - } - - Builder msg(int bytes) { - maxMessageBytes = bytes; - return this; - } - - BinaryLog build() { - return new BinaryLog( - new SinkWriterImpl(mock(BinaryLogSink.class), maxHeaderBytes, maxMessageBytes)); - } - } - - private static void assertSameLimits(BinaryLog a, BinaryLog b) { - assertEquals(a.writer.getMaxMessageBytes(), b.writer.getMaxMessageBytes()); - assertEquals(a.writer.getMaxHeaderBytes(), b.writer.getMaxHeaderBytes()); - } - - private BinaryLog makeLog(String factoryConfigStr, String lookup) { - return new BinaryLog.FactoryImpl(sink, factoryConfigStr).getLog(lookup); - } - - private BinaryLog makeLog(String logConfigStr) { - return FactoryImpl.createBinaryLog(sink, logConfigStr); - } -} diff --git a/services/src/test/java/io/grpc/services/BinlogHelperTest.java b/services/src/test/java/io/grpc/services/BinlogHelperTest.java new file mode 100644 index 000000000..07c334931 --- /dev/null +++ b/services/src/test/java/io/grpc/services/BinlogHelperTest.java @@ -0,0 +1,974 @@ +/* + * Copyright 2017, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static io.grpc.internal.BinaryLogProvider.BYTEARRAY_MARSHALLER; +import static io.grpc.services.BinlogHelper.DUMMY_SOCKET; +import static io.grpc.services.BinlogHelper.getPeerSocket; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.google.common.primitives.Bytes; +import com.google.protobuf.ByteString; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.Grpc; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.Status; +import io.grpc.binarylog.GrpcLogEntry; +import io.grpc.binarylog.Message; +import io.grpc.binarylog.MetadataEntry; +import io.grpc.binarylog.Peer; +import io.grpc.binarylog.Peer.PeerType; +import io.grpc.binarylog.Uint128; +import io.grpc.internal.BinaryLogProvider; +import io.grpc.internal.BinaryLogProvider.CallId; +import io.grpc.internal.NoopClientCall; +import io.grpc.internal.NoopServerCall; +import io.grpc.services.BinlogHelper.FactoryImpl; +import io.grpc.services.BinlogHelper.SinkWriter; +import io.grpc.services.BinlogHelper.SinkWriterImpl; +import io.netty.channel.unix.DomainSocketAddress; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link BinlogHelper}. */ +@RunWith(JUnit4.class) +public final class BinlogHelperTest { + private static final Charset US_ASCII = Charset.forName("US-ASCII"); + private static final BinlogHelper HEADER_FULL = new Builder().header(Integer.MAX_VALUE).build(); + private static final BinlogHelper HEADER_256 = new Builder().header(256).build(); + private static final BinlogHelper MSG_FULL = new Builder().msg(Integer.MAX_VALUE).build(); + private static final BinlogHelper MSG_256 = new Builder().msg(256).build(); + private static final BinlogHelper BOTH_256 = new Builder().header(256).msg(256).build(); + private static final BinlogHelper BOTH_FULL = + new Builder().header(Integer.MAX_VALUE).msg(Integer.MAX_VALUE).build(); + + private static final String DATA_A = "aaaaaaaaa"; + private static final String DATA_B = "bbbbbbbbb"; + private static final String DATA_C = "ccccccccc"; + private static final Metadata.Key KEY_A = + Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key KEY_B = + Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key KEY_C = + Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER); + private static final MetadataEntry ENTRY_A = + MetadataEntry + .newBuilder() + .setKey(ByteString.copyFrom(KEY_A.name(), US_ASCII)) + .setValue(ByteString.copyFrom(DATA_A.getBytes(US_ASCII))) + .build(); + private static final MetadataEntry ENTRY_B = + MetadataEntry + .newBuilder() + .setKey(ByteString.copyFrom(KEY_B.name(), US_ASCII)) + .setValue(ByteString.copyFrom(DATA_B.getBytes(US_ASCII))) + .build(); + private static final MetadataEntry ENTRY_C = + MetadataEntry + .newBuilder() + .setKey(ByteString.copyFrom(KEY_C.name(), US_ASCII)) + .setValue(ByteString.copyFrom(DATA_C.getBytes(US_ASCII))) + .build(); + private static final boolean IS_SERVER = true; + private static final boolean IS_CLIENT = false; + private static final boolean IS_COMPRESSED = true; + private static final boolean IS_UNCOMPRESSED = false; + // TODO(zpencer): rename this to callId, since byte[] is mutable + private static final CallId CALL_ID = + new CallId(0x1112131415161718L, 0x19101a1b1c1d1e1fL); + private static final int HEADER_LIMIT = 10; + private static final int MESSAGE_LIMIT = Integer.MAX_VALUE; + + private final Metadata nonEmptyMetadata = new Metadata(); + private final BinaryLogSink sink = mock(BinaryLogSink.class); + private final SinkWriter sinkWriterImpl = + new SinkWriterImpl(sink, HEADER_LIMIT, MESSAGE_LIMIT); + private final SinkWriter mockSinkWriter = mock(SinkWriter.class); + private final byte[] message = new byte[100]; + private SocketAddress peer; + + @Before + public void setUp() throws Exception { + nonEmptyMetadata.put(KEY_A, DATA_A); + nonEmptyMetadata.put(KEY_B, DATA_B); + nonEmptyMetadata.put(KEY_C, DATA_C); + peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234); + } + + @Test + public void configBinLog_global() throws Exception { + assertSameLimits(BOTH_FULL, makeLog("*", "p.s/m")); + assertSameLimits(BOTH_FULL, makeLog("*{h;m}", "p.s/m")); + assertSameLimits(HEADER_FULL, makeLog("*{h}", "p.s/m")); + assertSameLimits(MSG_FULL, makeLog("*{m}", "p.s/m")); + assertSameLimits(HEADER_256, makeLog("*{h:256}", "p.s/m")); + assertSameLimits(MSG_256, makeLog("*{m:256}", "p.s/m")); + assertSameLimits(BOTH_256, makeLog("*{h:256;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(Integer.MAX_VALUE).msg(256).build(), + makeLog("*{h;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(256).msg(Integer.MAX_VALUE).build(), + makeLog("*{h:256;m}", "p.s/m")); + } + + @Test + public void configBinLog_method() throws Exception { + assertSameLimits(BOTH_FULL, makeLog("p.s/m", "p.s/m")); + assertSameLimits(BOTH_FULL, makeLog("p.s/m{h;m}", "p.s/m")); + assertSameLimits(HEADER_FULL, makeLog("p.s/m{h}", "p.s/m")); + assertSameLimits(MSG_FULL, makeLog("p.s/m{m}", "p.s/m")); + assertSameLimits(HEADER_256, makeLog("p.s/m{h:256}", "p.s/m")); + assertSameLimits(MSG_256, makeLog("p.s/m{m:256}", "p.s/m")); + assertSameLimits(BOTH_256, makeLog("p.s/m{h:256;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(Integer.MAX_VALUE).msg(256).build(), + makeLog("p.s/m{h;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(256).msg(Integer.MAX_VALUE).build(), + makeLog("p.s/m{h:256;m}", "p.s/m")); + } + + @Test + public void configBinLog_method_absent() throws Exception { + assertNull(makeLog("p.s/m", "p.s/absent")); + } + + @Test + public void configBinLog_service() throws Exception { + assertSameLimits(BOTH_FULL, makeLog("p.s/*", "p.s/m")); + assertSameLimits(BOTH_FULL, makeLog("p.s/*{h;m}", "p.s/m")); + assertSameLimits(HEADER_FULL, makeLog("p.s/*{h}", "p.s/m")); + assertSameLimits(MSG_FULL, makeLog("p.s/*{m}", "p.s/m")); + assertSameLimits(HEADER_256, makeLog("p.s/*{h:256}", "p.s/m")); + assertSameLimits(MSG_256, makeLog("p.s/*{m:256}", "p.s/m")); + assertSameLimits(BOTH_256, makeLog("p.s/*{h:256;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(Integer.MAX_VALUE).msg(256).build(), + makeLog("p.s/*{h;m:256}", "p.s/m")); + assertSameLimits( + new Builder().header(256).msg(Integer.MAX_VALUE).build(), + makeLog("p.s/*{h:256;m}", "p.s/m")); + } + + @Test + public void configBinLog_service_absent() throws Exception { + assertNull(makeLog("p.s/*", "p.other/m")); + } + + @Test + public void createLogFromOptionString() throws Exception { + assertSameLimits(BOTH_FULL, makeLog(null)); + assertSameLimits(HEADER_FULL, makeLog("{h}")); + assertSameLimits(MSG_FULL, makeLog("{m}")); + assertSameLimits(HEADER_256, makeLog("{h:256}")); + assertSameLimits(MSG_256, makeLog("{m:256}")); + assertSameLimits(BOTH_256, makeLog("{h:256;m:256}")); + assertSameLimits( + new Builder().header(Integer.MAX_VALUE).msg(256).build(), + makeLog("{h;m:256}")); + assertSameLimits( + new Builder().header(256).msg(Integer.MAX_VALUE).build(), + makeLog("{h:256;m}")); + } + + @Test + public void createLogFromOptionString_malformed() throws Exception { + assertNull(makeLog("bad")); + assertNull(makeLog("{bad}")); + assertNull(makeLog("{x;y}")); + assertNull(makeLog("{h:abc}")); + assertNull(makeLog("{2}")); + assertNull(makeLog("{2;2}")); + // The grammar specifies that if both h and m are present, h comes before m + assertNull(makeLog("{m:123;h:123}")); + // NumberFormatException + assertNull(makeLog("{h:99999999999999}")); + } + + @Test + public void configBinLog_multiConfig_withGlobal() throws Exception { + String configStr = + "*{h}," + + "package.both256/*{h:256;m:256}," + + "package.service1/both128{h:128;m:128}," + + "package.service2/method_messageOnly{m}"; + assertSameLimits(HEADER_FULL, makeLog(configStr, "otherpackage.service/method")); + + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method1")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method2")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method3")); + + assertSameLimits( + new Builder().header(128).msg(128).build(), makeLog(configStr, "package.service1/both128")); + // the global config is in effect + assertSameLimits(HEADER_FULL, makeLog(configStr, "package.service1/absent")); + + assertSameLimits(MSG_FULL, makeLog(configStr, "package.service2/method_messageOnly")); + // the global config is in effect + assertSameLimits(HEADER_FULL, makeLog(configStr, "package.service2/absent")); + } + + @Test + public void configBinLog_multiConfig_noGlobal() throws Exception { + String configStr = + "package.both256/*{h:256;m:256}," + + "package.service1/both128{h:128;m:128}," + + "package.service2/method_messageOnly{m}"; + assertNull(makeLog(configStr, "otherpackage.service/method")); + + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method1")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method2")); + assertSameLimits(BOTH_256, makeLog(configStr, "package.both256/method3")); + + assertSameLimits( + new Builder().header(128).msg(128).build(), makeLog(configStr, "package.service1/both128")); + // no global config in effect + assertNull(makeLog(configStr, "package.service1/absent")); + + assertSameLimits(MSG_FULL, makeLog(configStr, "package.service2/method_messageOnly")); + // no global config in effect + assertNull(makeLog(configStr, "package.service2/absent")); + } + + @Test + public void configBinLog_ignoreDuplicates_global() throws Exception { + String configStr = "*{h},p.s/m,*{h:256}"; + // The duplicate + assertSameLimits(HEADER_FULL, makeLog(configStr, "p.other1/m")); + assertSameLimits(HEADER_FULL, makeLog(configStr, "p.other2/m")); + // Other + assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m")); + } + + @Test + public void configBinLog_ignoreDuplicates_service() throws Exception { + String configStr = "p.s/*,*{h:256},p.s/*{h}"; + // The duplicate + assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m1")); + assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m2")); + // Other + assertSameLimits(HEADER_256, makeLog(configStr, "p.other1/m")); + assertSameLimits(HEADER_256, makeLog(configStr, "p.other2/m")); + } + + @Test + public void configBinLog_ignoreDuplicates_method() throws Exception { + String configStr = "p.s/m,*{h:256},p.s/m{h}"; + // The duplicate + assertSameLimits(BOTH_FULL, makeLog(configStr, "p.s/m")); + // Other + assertSameLimits(HEADER_256, makeLog(configStr, "p.other1/m")); + assertSameLimits(HEADER_256, makeLog(configStr, "p.other2/m")); + } + + @Test + public void callIdToProto() { + CallId callId = new CallId(0x1112131415161718L, 0x19101a1b1c1d1e1fL); + assertEquals( + Uint128 + .newBuilder() + .setHigh(0x1112131415161718L) + .setLow(0x19101a1b1c1d1e1fL) + .build(), + BinlogHelper.callIdToProto(callId)); + } + + @Test + public void socketToProto_ipv4() throws Exception { + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + byte[] addressBytes = address.getAddress(); + byte[] portBytes = ByteBuffer.allocate(4).putInt(port).array(); + byte[] portUnsignedBytes = Arrays.copyOfRange(portBytes, 2, 4); + assertEquals( + Peer + .newBuilder() + .setPeerType(Peer.PeerType.PEER_IPV4) + .setPeer(ByteString.copyFrom(Bytes.concat(addressBytes, portUnsignedBytes))) + .build(), + BinlogHelper.socketToProto(socketAddress)); + } + + @Test + public void socketToProto_ipv6() throws Exception { + // this is a ipv6 link local address + InetAddress address = InetAddress.getByName("fe:80:12:34:56:78:90:ab"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + byte[] addressBytes = address.getAddress(); + byte[] portBytes = ByteBuffer.allocate(4).putInt(port).array(); + byte[] portUnsignedBytes = Arrays.copyOfRange(portBytes, 2, 4); + assertEquals( + Peer + .newBuilder() + .setPeerType(Peer.PeerType.PEER_IPV6) + .setPeer(ByteString.copyFrom(Bytes.concat(addressBytes, portUnsignedBytes))) + .build(), + BinlogHelper.socketToProto(socketAddress)); + } + + @Test + public void socketToProto_unix() throws Exception { + String path = "/some/path"; + DomainSocketAddress socketAddress = new DomainSocketAddress(path); + assertEquals( + Peer + .newBuilder() + .setPeerType(Peer.PeerType.PEER_UNIX) + .setPeer(ByteString.copyFrom(path.getBytes(US_ASCII))) + .build(), + BinlogHelper.socketToProto(socketAddress) + ); + } + + @Test + public void socketToProto_unknown() throws Exception { + SocketAddress unknownSocket = new SocketAddress() { }; + assertEquals( + Peer.newBuilder() + .setPeerType(PeerType.UNKNOWN_PEERTYPE) + .setPeer(ByteString.copyFrom(unknownSocket.toString(), US_ASCII)) + .build(), + BinlogHelper.socketToProto(unknownSocket)); + } + + @Test + public void metadataToProto_empty() throws Exception { + assertEquals( + io.grpc.binarylog.Metadata.getDefaultInstance(), + BinlogHelper.metadataToProto(new Metadata(), Integer.MAX_VALUE)); + } + + @Test + public void metadataToProto() throws Exception { + assertEquals( + io.grpc.binarylog.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .addEntry(ENTRY_C) + .build(), + BinlogHelper.metadataToProto(nonEmptyMetadata, Integer.MAX_VALUE)); + } + + @Test + public void metadataToProto_truncated() throws Exception { + // 0 byte limit not enough for any metadata + assertEquals( + io.grpc.binarylog.Metadata.getDefaultInstance(), + BinlogHelper.metadataToProto(nonEmptyMetadata, 0)); + // not enough bytes for first key value + assertEquals( + io.grpc.binarylog.Metadata.getDefaultInstance(), + BinlogHelper.metadataToProto(nonEmptyMetadata, 9)); + // enough for first key value + assertEquals( + io.grpc.binarylog.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .build(), + BinlogHelper.metadataToProto(nonEmptyMetadata, 10)); + // Test edge cases for >= 2 key values + assertEquals( + io.grpc.binarylog.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .build(), + BinlogHelper.metadataToProto(nonEmptyMetadata, 19)); + assertEquals( + io.grpc.binarylog.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .build(), + BinlogHelper.metadataToProto(nonEmptyMetadata, 20)); + assertEquals( + io.grpc.binarylog.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .build(), + BinlogHelper.metadataToProto(nonEmptyMetadata, 29)); + assertEquals( + io.grpc.binarylog.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .addEntry(ENTRY_C) + .build(), + BinlogHelper.metadataToProto(nonEmptyMetadata, 30)); + } + + @Test + public void messageToProto() throws Exception { + byte[] bytes + = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII); + Message message = BinlogHelper.messageToProto(bytes, false, Integer.MAX_VALUE); + assertEquals( + Message + .newBuilder() + .setData(ByteString.copyFrom(bytes)) + .setFlags(0) + .setLength(bytes.length) + .build(), + message); + } + + @Test + public void messageToProto_truncated() throws Exception { + byte[] bytes + = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(US_ASCII); + assertEquals( + Message + .newBuilder() + .setFlags(0) + .setLength(bytes.length) + .build(), + BinlogHelper.messageToProto(bytes, false, 0)); + + int limit = 10; + String truncatedMessage = "this is a "; + assertEquals( + Message + .newBuilder() + .setData(ByteString.copyFrom(truncatedMessage.getBytes(US_ASCII))) + .setFlags(0) + .setLength(bytes.length) + .build(), + BinlogHelper.messageToProto(bytes, false, limit)); + } + + @Test + public void toFlag() throws Exception { + assertEquals(0, BinlogHelper.flagsForMessage(IS_UNCOMPRESSED)); + assertEquals(1, BinlogHelper.flagsForMessage(IS_COMPRESSED)); + } + + @Test + public void logSendInitialMetadata_server() throws Exception { + sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMetadata(BinlogHelper.metadataToProto(nonEmptyMetadata, 10)) + .build()); + } + + @Test + public void logSendInitialMetadata_client() throws Exception { + sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMetadata(BinlogHelper.metadataToProto(nonEmptyMetadata, 10)) + .build()); + } + + @Test + public void logRecvInitialMetadata_server() throws Exception { + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setPeer(BinlogHelper.socketToProto(socketAddress)) + .setMetadata(BinlogHelper.metadataToProto(nonEmptyMetadata, 10)) + .build()); + } + + @Test + public void logRecvInitialMetadata_client() throws Exception { + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setPeer(BinlogHelper.socketToProto(socketAddress)) + .setMetadata(BinlogHelper.metadataToProto(nonEmptyMetadata, 10)) + .build()); + } + + @Test + public void logTrailingMetadata_server() throws Exception { + sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_TRAILING_METADATA) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMetadata(BinlogHelper.metadataToProto(nonEmptyMetadata, 10)) + .build()); + } + + @Test + public void logTrailingMetadata_client() throws Exception { + sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_TRAILING_METADATA) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMetadata(BinlogHelper.metadataToProto(nonEmptyMetadata, 10)) + .build()); + } + + @Test + public void logOutboundMessage_server() throws Exception { + sinkWriterImpl.logOutboundMessage( + BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_MESSAGE) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMessage(BinlogHelper.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) + .build()); + + sinkWriterImpl.logOutboundMessage( + BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_MESSAGE) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMessage( + BinlogHelper.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) + .build()); + verifyNoMoreInteractions(sink); + } + + @Test + public void logOutboundMessage_client() throws Exception { + sinkWriterImpl.logOutboundMessage( + BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_MESSAGE) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMessage(BinlogHelper.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) + .build()); + + sinkWriterImpl.logOutboundMessage( + BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.SEND_MESSAGE) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMessage( + BinlogHelper.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) + .build()); + verifyNoMoreInteractions(sink); + } + + @Test + public void logInboundMessage_server() throws Exception { + sinkWriterImpl.logInboundMessage( + BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_MESSAGE) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMessage(BinlogHelper.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) + .build()); + + sinkWriterImpl.logInboundMessage( + BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_MESSAGE) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMessage( + BinlogHelper.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) + .build()); + verifyNoMoreInteractions(sink); + } + + @Test + public void logInboundMessage_client() throws Exception { + sinkWriterImpl.logInboundMessage( + BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_MESSAGE) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMessage(BinlogHelper.messageToProto(message, IS_COMPRESSED, MESSAGE_LIMIT)) + .build()); + + sinkWriterImpl.logInboundMessage( + BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); + verify(sink).write( + GrpcLogEntry + .newBuilder() + .setType(GrpcLogEntry.Type.RECV_MESSAGE) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setMessage( + BinlogHelper.messageToProto(message, IS_UNCOMPRESSED, MESSAGE_LIMIT)) + .build()); + verifyNoMoreInteractions(sink); + } + + @Test + public void getPeerSocketTest() { + assertSame(DUMMY_SOCKET, getPeerSocket(Attributes.EMPTY)); + assertSame( + peer, + getPeerSocket(Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build())); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void clientInterceptor() throws Exception { + final AtomicReference interceptedListener = + new AtomicReference(); + // capture these manually because ClientCall can not be mocked + final AtomicReference actualClientInitial = new AtomicReference(); + final AtomicReference actualRequest = new AtomicReference(); + + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return new NoopClientCall() { + @Override + public void start(Listener responseListener, Metadata headers) { + interceptedListener.set(responseListener); + actualClientInitial.set(headers); + } + + @Override + public void sendMessage(RequestT message) { + actualRequest.set(message); + } + + @Override + public Attributes getAttributes() { + return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build(); + } + }; + } + + @Override + public String authority() { + throw new UnsupportedOperationException(); + } + }; + + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + ClientCall interceptedCall = + new BinlogHelper(mockSinkWriter) + .getClientInterceptor(CALL_ID) + .interceptCall( + method, + CallOptions.DEFAULT.withOption( + BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, CALL_ID), + channel); + + // send initial metadata + { + Metadata clientInitial = new Metadata(); + interceptedCall.start(mockListener, clientInitial); + verify(mockSinkWriter).logSendInitialMetadata( + same(clientInitial), + eq(IS_CLIENT), + same(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + assertSame(clientInitial, actualClientInitial.get()); + } + + // receive initial metadata + { + Metadata serverInitial = new Metadata(); + interceptedListener.get().onHeaders(serverInitial); + verify(mockSinkWriter).logRecvInitialMetadata(same(serverInitial), + eq(IS_CLIENT), + same(CALL_ID), + same(peer)); + verifyNoMoreInteractions(mockSinkWriter); + verify(mockListener).onHeaders(same(serverInitial)); + } + + // send request + { + byte[] request = "this is a request".getBytes(US_ASCII); + interceptedCall.sendMessage(request); + verify(mockSinkWriter).logOutboundMessage( + same(BYTEARRAY_MARSHALLER), + same(request), + eq(BinlogHelper.DUMMY_IS_COMPRESSED), + eq(IS_CLIENT), + same(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + assertSame(request, actualRequest.get()); + } + + // receive response + { + byte[] response = "this is a response".getBytes(US_ASCII); + interceptedListener.get().onMessage(response); + verify(mockSinkWriter).logInboundMessage( + same(BYTEARRAY_MARSHALLER), + eq(response), + eq(BinlogHelper.DUMMY_IS_COMPRESSED), + eq(IS_CLIENT), + same(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + verify(mockListener).onMessage(same(response)); + } + + // receive trailers + { + Status status = Status.INTERNAL.withDescription("some description"); + Metadata trailers = new Metadata(); + + interceptedListener.get().onClose(status, trailers); + verify(mockSinkWriter).logTrailingMetadata( + same(trailers), + eq(IS_CLIENT), + same(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + verify(mockListener).onClose(same(status), same(trailers)); + } + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void serverInterceptor() throws Exception { + final AtomicReference interceptedCall = + new AtomicReference(); + ServerCall.Listener capturedListener; + final ServerCall.Listener mockListener = mock(ServerCall.Listener.class); + // capture these manually because ServerCall can not be mocked + final AtomicReference actualServerInitial = new AtomicReference(); + final AtomicReference actualResponse = new AtomicReference(); + final AtomicReference actualStatus = new AtomicReference(); + final AtomicReference actualTrailers = new AtomicReference(); + + // begin call and receive initial metadata + { + Metadata clientInitial = new Metadata(); + final MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + capturedListener = + new BinlogHelper(mockSinkWriter) + .getServerInterceptor(CALL_ID) + .interceptCall( + new NoopServerCall() { + @Override + public void sendHeaders(Metadata headers) { + actualServerInitial.set(headers); + } + + @Override + public void sendMessage(byte[] message) { + actualResponse.set(message); + } + + @Override + public void close(Status status, Metadata trailers) { + actualStatus.set(status); + actualTrailers.set(trailers); + } + + @Override + public MethodDescriptor getMethodDescriptor() { + return method; + } + + @Override + public Attributes getAttributes() { + return Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer) + .build(); + } + }, + clientInitial, + new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, + Metadata headers) { + interceptedCall.set(call); + return mockListener; + } + }); + verify(mockSinkWriter).logRecvInitialMetadata( + same(clientInitial), + eq(IS_SERVER), + same(CALL_ID), + same(peer)); + verifyNoMoreInteractions(mockSinkWriter); + } + + // send initial metadata + { + Metadata serverInital = new Metadata(); + interceptedCall.get().sendHeaders(serverInital); + verify(mockSinkWriter).logSendInitialMetadata( + same(serverInital), + eq(IS_SERVER), + same(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + assertSame(serverInital, actualServerInitial.get()); + } + + // receive request + { + byte[] request = "this is a request".getBytes(US_ASCII); + capturedListener.onMessage(request); + verify(mockSinkWriter).logInboundMessage( + same(BYTEARRAY_MARSHALLER), + same(request), + eq(BinlogHelper.DUMMY_IS_COMPRESSED), + eq(IS_SERVER), + same(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + verify(mockListener).onMessage(same(request)); + } + + // send response + { + byte[] response = "this is a response".getBytes(US_ASCII); + interceptedCall.get().sendMessage(response); + verify(mockSinkWriter).logOutboundMessage( + same(BYTEARRAY_MARSHALLER), + same(response), + eq(BinlogHelper.DUMMY_IS_COMPRESSED), + eq(IS_SERVER), + same(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + assertSame(response, actualResponse.get()); + } + + // send trailers + { + Status status = Status.INTERNAL.withDescription("some description"); + Metadata trailers = new Metadata(); + interceptedCall.get().close(status, trailers); + verify(mockSinkWriter).logTrailingMetadata( + same(trailers), + eq(IS_SERVER), + same(CALL_ID)); + verifyNoMoreInteractions(mockSinkWriter); + assertSame(status, actualStatus.get()); + assertSame(trailers, actualTrailers.get()); + } + } + + /** A builder class to make unit test code more readable. */ + private static final class Builder { + int maxHeaderBytes = 0; + int maxMessageBytes = 0; + + Builder header(int bytes) { + maxHeaderBytes = bytes; + return this; + } + + Builder msg(int bytes) { + maxMessageBytes = bytes; + return this; + } + + BinlogHelper build() { + return new BinlogHelper( + new SinkWriterImpl(mock(BinaryLogSink.class), maxHeaderBytes, maxMessageBytes)); + } + } + + private static void assertSameLimits(BinlogHelper a, BinlogHelper b) { + assertEquals(a.writer.getMaxMessageBytes(), b.writer.getMaxMessageBytes()); + assertEquals(a.writer.getMaxHeaderBytes(), b.writer.getMaxHeaderBytes()); + } + + private BinlogHelper makeLog(String factoryConfigStr, String lookup) { + return new BinlogHelper.FactoryImpl(sink, factoryConfigStr).getLog(lookup); + } + + private BinlogHelper makeLog(String logConfigStr) { + return FactoryImpl.createBinaryLog(sink, logConfigStr); + } +} -- cgit v1.2.3