aboutsummaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
authorzpencer <spencerfang@google.com>2018-06-07 12:19:36 -0700
committerGitHub <noreply@github.com>2018-06-07 12:19:36 -0700
commitc05d0f40eae1686d22a4b2347043dd725c7a6f37 (patch)
tree02033a6c3b9fcdc1c120df7f921b80f3637388e1 /services
parentbc9d3ab7cad944a46c34afcd2bdc28c1b8154543 (diff)
downloadgrpc-grpc-java-c05d0f40eae1686d22a4b2347043dd725c7a6f37.tar.gz
services: add sequence id within call for entries (#4549)
This is a new field added to the binlog proto so that we can detect when the storage impl reorders or drops recorded entries.
Diffstat (limited to 'services')
-rw-r--r--services/src/main/java/io/grpc/services/BinlogHelper.java48
-rw-r--r--services/src/test/java/io/grpc/services/BinlogHelperTest.java57
2 files changed, 74 insertions, 31 deletions
diff --git a/services/src/main/java/io/grpc/services/BinlogHelper.java b/services/src/main/java/io/grpc/services/BinlogHelper.java
index 907c488a2..566b63e94 100644
--- a/services/src/main/java/io/grpc/services/BinlogHelper.java
+++ b/services/src/main/java/io/grpc/services/BinlogHelper.java
@@ -62,6 +62,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -77,8 +78,6 @@ final class BinlogHelper {
private static final boolean CLIENT = false;
@VisibleForTesting
- static final CallId emptyCallId = new CallId(0, 0);
- @VisibleForTesting
static final SocketAddress DUMMY_SOCKET = new DummySocketAddress();
@VisibleForTesting
static final boolean DUMMY_IS_COMPRESSED = false;
@@ -104,8 +103,9 @@ final class BinlogHelper {
}
@Override
- void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId) {
+ void logSendInitialMetadata(int seq, Metadata metadata, boolean isServer, CallId callId) {
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
+ .setSequenceIdWithinCall(seq)
.setType(Type.SEND_INITIAL_METADATA)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId));
@@ -115,8 +115,9 @@ final class BinlogHelper {
@Override
void logRecvInitialMetadata(
- Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) {
+ int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) {
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
+ .setSequenceIdWithinCall(seq)
.setType(Type.RECV_INITIAL_METADATA)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId))
@@ -126,8 +127,9 @@ final class BinlogHelper {
}
@Override
- void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId) {
+ void logTrailingMetadata(int seq, Metadata metadata, boolean isServer, CallId callId) {
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
+ .setSequenceIdWithinCall(seq)
.setType(isServer ? Type.SEND_TRAILING_METADATA : Type.RECV_TRAILING_METADATA)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId));
@@ -137,6 +139,7 @@ final class BinlogHelper {
@Override
<T> void logOutboundMessage(
+ int seq,
Marshaller<T> marshaller,
T message,
boolean compressed,
@@ -146,6 +149,7 @@ final class BinlogHelper {
throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller");
}
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
+ .setSequenceIdWithinCall(seq)
.setType(Type.SEND_MESSAGE)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId));
@@ -155,6 +159,7 @@ final class BinlogHelper {
@Override
<T> void logInboundMessage(
+ int seq,
Marshaller<T> marshaller,
T message,
boolean compressed,
@@ -164,6 +169,7 @@ final class BinlogHelper {
throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller");
}
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
+ .setSequenceIdWithinCall(seq)
.setType(Type.RECV_MESSAGE)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId));
@@ -188,20 +194,21 @@ final class BinlogHelper {
* Logs the sending of initial metadata. This method logs the appropriate number of bytes
* as determined by the binary logging configuration.
*/
- abstract void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId);
+ abstract void logSendInitialMetadata(
+ int seq, Metadata metadata, boolean isServer, CallId callId);
/**
* Logs the receiving of initial metadata. This method logs the appropriate number of bytes
* as determined by the binary logging configuration.
*/
abstract void logRecvInitialMetadata(
- Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket);
+ int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket);
/**
* Logs the trailing metadata. This method logs the appropriate number of bytes
* as determined by the binary logging configuration.
*/
- abstract void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId);
+ abstract void logTrailingMetadata(int seq, Metadata metadata, boolean isServer, CallId callId);
/**
* Logs the outbound message. This method logs the appropriate number of bytes from
@@ -210,7 +217,8 @@ final class BinlogHelper {
* This method takes ownership of {@code message}.
*/
abstract <T> void logOutboundMessage(
- Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, CallId callId);
+ int seq, Marshaller<T> marshaller, T message, boolean compressed, boolean isServer,
+ CallId callId);
/**
* Logs the inbound message. This method logs the appropriate number of bytes from
@@ -219,7 +227,8 @@ final class BinlogHelper {
* This method takes ownership of {@code message}.
*/
abstract <T> void logInboundMessage(
- Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, CallId callId);
+ int seq, Marshaller<T> marshaller, T message, boolean compressed, boolean isServer,
+ CallId callId);
/**
* Returns the number bytes of the header this writer will log, according to configuration.
@@ -245,15 +254,17 @@ final class BinlogHelper {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+ final AtomicInteger seq = new AtomicInteger(1);
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
- writer.logSendInitialMetadata(headers, CLIENT, callId);
+ writer.logSendInitialMetadata(seq.getAndIncrement(), headers, CLIENT, callId);
ClientCall.Listener<RespT> wListener =
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onMessage(RespT message) {
writer.logInboundMessage(
+ seq.getAndIncrement(),
method.getResponseMarshaller(),
message,
DUMMY_IS_COMPRESSED,
@@ -265,13 +276,14 @@ final class BinlogHelper {
@Override
public void onHeaders(Metadata headers) {
SocketAddress peer = getPeerSocket(getAttributes());
- writer.logRecvInitialMetadata(headers, CLIENT, callId, peer);
+ writer.logRecvInitialMetadata(
+ seq.getAndIncrement(), headers, CLIENT, callId, peer);
super.onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
- writer.logTrailingMetadata(trailers, CLIENT, callId);
+ writer.logTrailingMetadata(seq.getAndIncrement(), trailers, CLIENT, callId);
super.onClose(status, trailers);
}
};
@@ -281,6 +293,7 @@ final class BinlogHelper {
@Override
public void sendMessage(ReqT message) {
writer.logOutboundMessage(
+ seq.getAndIncrement(),
method.getRequestMarshaller(),
message,
DUMMY_IS_COMPRESSED,
@@ -300,12 +313,14 @@ final class BinlogHelper {
final ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
+ final AtomicInteger seq = new AtomicInteger(1);
SocketAddress peer = getPeerSocket(call.getAttributes());
- writer.logRecvInitialMetadata(headers, SERVER, callId, peer);
+ writer.logRecvInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId, peer);
ServerCall<ReqT, RespT> wCall = new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
writer.logOutboundMessage(
+ seq.getAndIncrement(),
call.getMethodDescriptor().getResponseMarshaller(),
message,
DUMMY_IS_COMPRESSED,
@@ -316,13 +331,13 @@ final class BinlogHelper {
@Override
public void sendHeaders(Metadata headers) {
- writer.logSendInitialMetadata(headers, SERVER, callId);
+ writer.logSendInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId);
super.sendHeaders(headers);
}
@Override
public void close(Status status, Metadata trailers) {
- writer.logTrailingMetadata(trailers, SERVER, callId);
+ writer.logTrailingMetadata(seq.getAndIncrement(), trailers, SERVER, callId);
super.close(status, trailers);
}
};
@@ -331,6 +346,7 @@ final class BinlogHelper {
@Override
public void onMessage(ReqT message) {
writer.logInboundMessage(
+ seq.getAndIncrement(),
call.getMethodDescriptor().getRequestMarshaller(),
message,
DUMMY_IS_COMPRESSED,
diff --git a/services/src/test/java/io/grpc/services/BinlogHelperTest.java b/services/src/test/java/io/grpc/services/BinlogHelperTest.java
index 9c7417b70..7ad9fa66b 100644
--- a/services/src/test/java/io/grpc/services/BinlogHelperTest.java
+++ b/services/src/test/java/io/grpc/services/BinlogHelperTest.java
@@ -535,9 +535,10 @@ public final class BinlogHelperTest {
@Test
public void logSendInitialMetadata_server() throws Exception {
- sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID);
+ sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@@ -546,9 +547,10 @@ public final class BinlogHelperTest {
@Test
public void logSendInitialMetadata_client() throws Exception {
- sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID);
+ sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@@ -560,9 +562,11 @@ public final class BinlogHelperTest {
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
- sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress);
+ sinkWriterImpl.logRecvInitialMetadata(
+ /*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@@ -575,9 +579,11 @@ public final class BinlogHelperTest {
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
- sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress);
+ sinkWriterImpl.logRecvInitialMetadata(
+ /*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@@ -587,9 +593,10 @@ public final class BinlogHelperTest {
@Test
public void logTrailingMetadata_server() throws Exception {
- sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID);
+ sinkWriterImpl.logTrailingMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_TRAILING_METADATA)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@@ -598,9 +605,10 @@ public final class BinlogHelperTest {
@Test
public void logTrailingMetadata_client() throws Exception {
- sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID);
+ sinkWriterImpl.logTrailingMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_TRAILING_METADATA)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@@ -610,18 +618,20 @@ public final class BinlogHelperTest {
@Test
public void logOutboundMessage_server() throws Exception {
sinkWriterImpl.logOutboundMessage(
- BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID);
+ /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_MESSAGE)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
.build());
sinkWriterImpl.logOutboundMessage(
- BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID);
+ /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_MESSAGE)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@@ -632,18 +642,20 @@ public final class BinlogHelperTest {
@Test
public void logOutboundMessage_client() throws Exception {
sinkWriterImpl.logOutboundMessage(
- BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID);
+ /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_MESSAGE)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
.build());
sinkWriterImpl.logOutboundMessage(
- BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID);
+ /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_MESSAGE)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@@ -654,18 +666,20 @@ public final class BinlogHelperTest {
@Test
public void logInboundMessage_server() throws Exception {
sinkWriterImpl.logInboundMessage(
- BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID);
+ /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_MESSAGE)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
.build());
sinkWriterImpl.logInboundMessage(
- BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID);
+ /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_MESSAGE)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@@ -676,18 +690,20 @@ public final class BinlogHelperTest {
@Test
public void logInboundMessage_client() throws Exception {
sinkWriterImpl.logInboundMessage(
- BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID);
+ /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_MESSAGE)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
.build());
sinkWriterImpl.logInboundMessage(
- BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID);
+ /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder()
+ .setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_MESSAGE)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@@ -764,6 +780,7 @@ public final class BinlogHelperTest {
Metadata clientInitial = new Metadata();
interceptedCall.start(mockListener, clientInitial);
verify(mockSinkWriter).logSendInitialMetadata(
+ /*seq=*/ eq(1),
same(clientInitial),
eq(IS_CLIENT),
same(CALL_ID));
@@ -775,7 +792,9 @@ public final class BinlogHelperTest {
{
Metadata serverInitial = new Metadata();
interceptedListener.get().onHeaders(serverInitial);
- verify(mockSinkWriter).logRecvInitialMetadata(same(serverInitial),
+ verify(mockSinkWriter).logRecvInitialMetadata(
+ /*seq=*/ eq(2),
+ same(serverInitial),
eq(IS_CLIENT),
same(CALL_ID),
same(peer));
@@ -788,6 +807,7 @@ public final class BinlogHelperTest {
byte[] request = "this is a request".getBytes(US_ASCII);
interceptedCall.sendMessage(request);
verify(mockSinkWriter).logOutboundMessage(
+ /*seq=*/ eq(3),
same(BYTEARRAY_MARSHALLER),
same(request),
eq(BinlogHelper.DUMMY_IS_COMPRESSED),
@@ -802,6 +822,7 @@ public final class BinlogHelperTest {
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedListener.get().onMessage(response);
verify(mockSinkWriter).logInboundMessage(
+ /*seq=*/ eq(4),
same(BYTEARRAY_MARSHALLER),
eq(response),
eq(BinlogHelper.DUMMY_IS_COMPRESSED),
@@ -818,6 +839,7 @@ public final class BinlogHelperTest {
interceptedListener.get().onClose(status, trailers);
verify(mockSinkWriter).logTrailingMetadata(
+ /*seq=*/ eq(5),
same(trailers),
eq(IS_CLIENT),
same(CALL_ID));
@@ -894,6 +916,7 @@ public final class BinlogHelperTest {
}
});
verify(mockSinkWriter).logRecvInitialMetadata(
+ /*seq=*/ eq(1),
same(clientInitial),
eq(IS_SERVER),
same(CALL_ID),
@@ -906,6 +929,7 @@ public final class BinlogHelperTest {
Metadata serverInital = new Metadata();
interceptedCall.get().sendHeaders(serverInital);
verify(mockSinkWriter).logSendInitialMetadata(
+ /*seq=*/ eq(2),
same(serverInital),
eq(IS_SERVER),
same(CALL_ID));
@@ -918,6 +942,7 @@ public final class BinlogHelperTest {
byte[] request = "this is a request".getBytes(US_ASCII);
capturedListener.onMessage(request);
verify(mockSinkWriter).logInboundMessage(
+ /*seq=*/ eq(3),
same(BYTEARRAY_MARSHALLER),
same(request),
eq(BinlogHelper.DUMMY_IS_COMPRESSED),
@@ -932,6 +957,7 @@ public final class BinlogHelperTest {
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedCall.get().sendMessage(response);
verify(mockSinkWriter).logOutboundMessage(
+ /*seq=*/ eq(4),
same(BYTEARRAY_MARSHALLER),
same(response),
eq(BinlogHelper.DUMMY_IS_COMPRESSED),
@@ -947,6 +973,7 @@ public final class BinlogHelperTest {
Metadata trailers = new Metadata();
interceptedCall.get().close(status, trailers);
verify(mockSinkWriter).logTrailingMetadata(
+ /*seq=*/ eq(5),
same(trailers),
eq(IS_SERVER),
same(CALL_ID));