diff options
author | zpencer <spencerfang@google.com> | 2018-06-07 12:19:36 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-06-07 12:19:36 -0700 |
commit | c05d0f40eae1686d22a4b2347043dd725c7a6f37 (patch) | |
tree | 02033a6c3b9fcdc1c120df7f921b80f3637388e1 /services | |
parent | bc9d3ab7cad944a46c34afcd2bdc28c1b8154543 (diff) | |
download | grpc-grpc-java-c05d0f40eae1686d22a4b2347043dd725c7a6f37.tar.gz |
services: add sequence id within call for entries (#4549)
This is a new field added to the binlog proto so that we can detect when
the storage impl reorders or drops recorded entries.
Diffstat (limited to 'services')
-rw-r--r-- | services/src/main/java/io/grpc/services/BinlogHelper.java | 48 | ||||
-rw-r--r-- | services/src/test/java/io/grpc/services/BinlogHelperTest.java | 57 |
2 files changed, 74 insertions, 31 deletions
diff --git a/services/src/main/java/io/grpc/services/BinlogHelper.java b/services/src/main/java/io/grpc/services/BinlogHelper.java index 907c488a2..566b63e94 100644 --- a/services/src/main/java/io/grpc/services/BinlogHelper.java +++ b/services/src/main/java/io/grpc/services/BinlogHelper.java @@ -62,6 +62,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -77,8 +78,6 @@ final class BinlogHelper { private static final boolean CLIENT = false; @VisibleForTesting - static final CallId emptyCallId = new CallId(0, 0); - @VisibleForTesting static final SocketAddress DUMMY_SOCKET = new DummySocketAddress(); @VisibleForTesting static final boolean DUMMY_IS_COMPRESSED = false; @@ -104,8 +103,9 @@ final class BinlogHelper { } @Override - void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId) { + void logSendInitialMetadata(int seq, Metadata metadata, boolean isServer, CallId callId) { GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() + .setSequenceIdWithinCall(seq) .setType(Type.SEND_INITIAL_METADATA) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)); @@ -115,8 +115,9 @@ final class BinlogHelper { @Override void logRecvInitialMetadata( - Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) { + int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) { GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() + .setSequenceIdWithinCall(seq) .setType(Type.RECV_INITIAL_METADATA) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)) @@ -126,8 +127,9 @@ final class BinlogHelper { } @Override - void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId) { + void logTrailingMetadata(int seq, Metadata metadata, boolean isServer, CallId callId) { GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() + .setSequenceIdWithinCall(seq) .setType(isServer ? Type.SEND_TRAILING_METADATA : Type.RECV_TRAILING_METADATA) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)); @@ -137,6 +139,7 @@ final class BinlogHelper { @Override <T> void logOutboundMessage( + int seq, Marshaller<T> marshaller, T message, boolean compressed, @@ -146,6 +149,7 @@ final class BinlogHelper { throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); } GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() + .setSequenceIdWithinCall(seq) .setType(Type.SEND_MESSAGE) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)); @@ -155,6 +159,7 @@ final class BinlogHelper { @Override <T> void logInboundMessage( + int seq, Marshaller<T> marshaller, T message, boolean compressed, @@ -164,6 +169,7 @@ final class BinlogHelper { throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); } GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() + .setSequenceIdWithinCall(seq) .setType(Type.RECV_MESSAGE) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)); @@ -188,20 +194,21 @@ final class BinlogHelper { * Logs the sending of initial metadata. This method logs the appropriate number of bytes * as determined by the binary logging configuration. */ - abstract void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId); + abstract void logSendInitialMetadata( + int seq, Metadata metadata, boolean isServer, CallId callId); /** * Logs the receiving of initial metadata. This method logs the appropriate number of bytes * as determined by the binary logging configuration. */ abstract void logRecvInitialMetadata( - Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket); + int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket); /** * Logs the trailing metadata. This method logs the appropriate number of bytes * as determined by the binary logging configuration. */ - abstract void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId); + abstract void logTrailingMetadata(int seq, Metadata metadata, boolean isServer, CallId callId); /** * Logs the outbound message. This method logs the appropriate number of bytes from @@ -210,7 +217,8 @@ final class BinlogHelper { * This method takes ownership of {@code message}. */ abstract <T> void logOutboundMessage( - Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, CallId callId); + int seq, Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, + CallId callId); /** * Logs the inbound message. This method logs the appropriate number of bytes from @@ -219,7 +227,8 @@ final class BinlogHelper { * This method takes ownership of {@code message}. */ abstract <T> void logInboundMessage( - Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, CallId callId); + int seq, Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, + CallId callId); /** * Returns the number bytes of the header this writer will log, according to configuration. @@ -245,15 +254,17 @@ final class BinlogHelper { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { + final AtomicInteger seq = new AtomicInteger(1); return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { @Override public void start(Listener<RespT> responseListener, Metadata headers) { - writer.logSendInitialMetadata(headers, CLIENT, callId); + writer.logSendInitialMetadata(seq.getAndIncrement(), headers, CLIENT, callId); ClientCall.Listener<RespT> wListener = new SimpleForwardingClientCallListener<RespT>(responseListener) { @Override public void onMessage(RespT message) { writer.logInboundMessage( + seq.getAndIncrement(), method.getResponseMarshaller(), message, DUMMY_IS_COMPRESSED, @@ -265,13 +276,14 @@ final class BinlogHelper { @Override public void onHeaders(Metadata headers) { SocketAddress peer = getPeerSocket(getAttributes()); - writer.logRecvInitialMetadata(headers, CLIENT, callId, peer); + writer.logRecvInitialMetadata( + seq.getAndIncrement(), headers, CLIENT, callId, peer); super.onHeaders(headers); } @Override public void onClose(Status status, Metadata trailers) { - writer.logTrailingMetadata(trailers, CLIENT, callId); + writer.logTrailingMetadata(seq.getAndIncrement(), trailers, CLIENT, callId); super.onClose(status, trailers); } }; @@ -281,6 +293,7 @@ final class BinlogHelper { @Override public void sendMessage(ReqT message) { writer.logOutboundMessage( + seq.getAndIncrement(), method.getRequestMarshaller(), message, DUMMY_IS_COMPRESSED, @@ -300,12 +313,14 @@ final class BinlogHelper { final ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { + final AtomicInteger seq = new AtomicInteger(1); SocketAddress peer = getPeerSocket(call.getAttributes()); - writer.logRecvInitialMetadata(headers, SERVER, callId, peer); + writer.logRecvInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId, peer); ServerCall<ReqT, RespT> wCall = new SimpleForwardingServerCall<ReqT, RespT>(call) { @Override public void sendMessage(RespT message) { writer.logOutboundMessage( + seq.getAndIncrement(), call.getMethodDescriptor().getResponseMarshaller(), message, DUMMY_IS_COMPRESSED, @@ -316,13 +331,13 @@ final class BinlogHelper { @Override public void sendHeaders(Metadata headers) { - writer.logSendInitialMetadata(headers, SERVER, callId); + writer.logSendInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId); super.sendHeaders(headers); } @Override public void close(Status status, Metadata trailers) { - writer.logTrailingMetadata(trailers, SERVER, callId); + writer.logTrailingMetadata(seq.getAndIncrement(), trailers, SERVER, callId); super.close(status, trailers); } }; @@ -331,6 +346,7 @@ final class BinlogHelper { @Override public void onMessage(ReqT message) { writer.logInboundMessage( + seq.getAndIncrement(), call.getMethodDescriptor().getRequestMarshaller(), message, DUMMY_IS_COMPRESSED, diff --git a/services/src/test/java/io/grpc/services/BinlogHelperTest.java b/services/src/test/java/io/grpc/services/BinlogHelperTest.java index 9c7417b70..7ad9fa66b 100644 --- a/services/src/test/java/io/grpc/services/BinlogHelperTest.java +++ b/services/src/test/java/io/grpc/services/BinlogHelperTest.java @@ -535,9 +535,10 @@ public final class BinlogHelperTest { @Test public void logSendInitialMetadata_server() throws Exception { - sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID); + sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -546,9 +547,10 @@ public final class BinlogHelperTest { @Test public void logSendInitialMetadata_client() throws Exception { - sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID); + sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -560,9 +562,11 @@ public final class BinlogHelperTest { InetAddress address = InetAddress.getByName("127.0.0.1"); int port = 12345; InetSocketAddress socketAddress = new InetSocketAddress(address, port); - sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress); + sinkWriterImpl.logRecvInitialMetadata( + /*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -575,9 +579,11 @@ public final class BinlogHelperTest { InetAddress address = InetAddress.getByName("127.0.0.1"); int port = 12345; InetSocketAddress socketAddress = new InetSocketAddress(address, port); - sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress); + sinkWriterImpl.logRecvInitialMetadata( + /*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -587,9 +593,10 @@ public final class BinlogHelperTest { @Test public void logTrailingMetadata_server() throws Exception { - sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID); + sinkWriterImpl.logTrailingMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_TRAILING_METADATA) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -598,9 +605,10 @@ public final class BinlogHelperTest { @Test public void logTrailingMetadata_client() throws Exception { - sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID); + sinkWriterImpl.logTrailingMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_TRAILING_METADATA) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -610,18 +618,20 @@ public final class BinlogHelperTest { @Test public void logOutboundMessage_server() throws Exception { sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_MESSAGE) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) .build()); sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_MESSAGE) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -632,18 +642,20 @@ public final class BinlogHelperTest { @Test public void logOutboundMessage_client() throws Exception { sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_MESSAGE) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) .build()); sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_MESSAGE) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -654,18 +666,20 @@ public final class BinlogHelperTest { @Test public void logInboundMessage_server() throws Exception { sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_MESSAGE) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) .build()); sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_MESSAGE) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -676,18 +690,20 @@ public final class BinlogHelperTest { @Test public void logInboundMessage_client() throws Exception { sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_MESSAGE) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) .build()); sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_MESSAGE) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -764,6 +780,7 @@ public final class BinlogHelperTest { Metadata clientInitial = new Metadata(); interceptedCall.start(mockListener, clientInitial); verify(mockSinkWriter).logSendInitialMetadata( + /*seq=*/ eq(1), same(clientInitial), eq(IS_CLIENT), same(CALL_ID)); @@ -775,7 +792,9 @@ public final class BinlogHelperTest { { Metadata serverInitial = new Metadata(); interceptedListener.get().onHeaders(serverInitial); - verify(mockSinkWriter).logRecvInitialMetadata(same(serverInitial), + verify(mockSinkWriter).logRecvInitialMetadata( + /*seq=*/ eq(2), + same(serverInitial), eq(IS_CLIENT), same(CALL_ID), same(peer)); @@ -788,6 +807,7 @@ public final class BinlogHelperTest { byte[] request = "this is a request".getBytes(US_ASCII); interceptedCall.sendMessage(request); verify(mockSinkWriter).logOutboundMessage( + /*seq=*/ eq(3), same(BYTEARRAY_MARSHALLER), same(request), eq(BinlogHelper.DUMMY_IS_COMPRESSED), @@ -802,6 +822,7 @@ public final class BinlogHelperTest { byte[] response = "this is a response".getBytes(US_ASCII); interceptedListener.get().onMessage(response); verify(mockSinkWriter).logInboundMessage( + /*seq=*/ eq(4), same(BYTEARRAY_MARSHALLER), eq(response), eq(BinlogHelper.DUMMY_IS_COMPRESSED), @@ -818,6 +839,7 @@ public final class BinlogHelperTest { interceptedListener.get().onClose(status, trailers); verify(mockSinkWriter).logTrailingMetadata( + /*seq=*/ eq(5), same(trailers), eq(IS_CLIENT), same(CALL_ID)); @@ -894,6 +916,7 @@ public final class BinlogHelperTest { } }); verify(mockSinkWriter).logRecvInitialMetadata( + /*seq=*/ eq(1), same(clientInitial), eq(IS_SERVER), same(CALL_ID), @@ -906,6 +929,7 @@ public final class BinlogHelperTest { Metadata serverInital = new Metadata(); interceptedCall.get().sendHeaders(serverInital); verify(mockSinkWriter).logSendInitialMetadata( + /*seq=*/ eq(2), same(serverInital), eq(IS_SERVER), same(CALL_ID)); @@ -918,6 +942,7 @@ public final class BinlogHelperTest { byte[] request = "this is a request".getBytes(US_ASCII); capturedListener.onMessage(request); verify(mockSinkWriter).logInboundMessage( + /*seq=*/ eq(3), same(BYTEARRAY_MARSHALLER), same(request), eq(BinlogHelper.DUMMY_IS_COMPRESSED), @@ -932,6 +957,7 @@ public final class BinlogHelperTest { byte[] response = "this is a response".getBytes(US_ASCII); interceptedCall.get().sendMessage(response); verify(mockSinkWriter).logOutboundMessage( + /*seq=*/ eq(4), same(BYTEARRAY_MARSHALLER), same(response), eq(BinlogHelper.DUMMY_IS_COMPRESSED), @@ -947,6 +973,7 @@ public final class BinlogHelperTest { Metadata trailers = new Metadata(); interceptedCall.get().close(status, trailers); verify(mockSinkWriter).logTrailingMetadata( + /*seq=*/ eq(5), same(trailers), eq(IS_SERVER), same(CALL_ID)); |