diff options
author | zpencer <spencerfang@google.com> | 2017-11-02 15:44:21 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-02 15:44:21 -0700 |
commit | 2162cd07d5debe833bfdfe3512f30ed085140b91 (patch) | |
tree | 231d858d2eda19e3d21cf0e4c2dd18013e73e8d2 /testing | |
parent | 4c96ebd6d410923d78ab110c36bf8b3b1402126c (diff) | |
download | grpc-grpc-java-2162cd07d5debe833bfdfe3512f30ed085140b91.tar.gz |
netty,core: add a TransportTracer class (#3454)
Diffstat (limited to 'testing')
-rw-r--r-- | testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java | 192 |
1 files changed, 192 insertions, 0 deletions
diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index b784aad4c..b08d6e07a 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -62,6 +62,7 @@ import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.TransportTracer; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -1370,6 +1371,197 @@ public abstract class AbstractTransportTest { doPingPong(serverListener); } + // Not all transports support the tracer yet + protected boolean haveTransportTracer() { + return false; + } + + @Test + public void transportTracer_streamStarted() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + MockServerTransportListener serverTransportListener + = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (!haveTransportTracer()) { + return; + } + + // start first stream + long firstTimestamp; + { + TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get(); + assertEquals(0, before.streamsStarted); + assertEquals(0, before.lastStreamCreatedTimeNanos); + + ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + clientStream.start(clientStreamListener); + StreamCreation serverStreamCreation = serverTransportListener + .takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get(); + assertEquals(1, after.streamsStarted); + firstTimestamp = TimeUnit.NANOSECONDS.toMillis(after.lastStreamCreatedTimeNanos); + assertThat(System.currentTimeMillis() - firstTimestamp).isAtMost(50L); + + ServerStream serverStream = serverStreamCreation.stream; + serverStream.close(Status.OK, new Metadata()); + } + + // start second stream + { + TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get(); + assertEquals(1, before.streamsStarted); + + ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + clientStream.start(clientStreamListener); + StreamCreation serverStreamCreation = serverTransportListener + .takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get(); + assertEquals(2, after.streamsStarted); + assertTrue(after.lastStreamCreatedTimeNanos > firstTimestamp); + long secondTimestamp = TimeUnit.NANOSECONDS.toMillis(after.lastStreamCreatedTimeNanos); + assertThat(System.currentTimeMillis() - secondTimestamp).isAtMost(50L); + + ServerStream serverStream = serverStreamCreation.stream; + serverStream.close(Status.OK, new Metadata()); + } + } + + @Test + public void transportTracer_streamEnded_ok() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + clientStream.start(clientStreamListener); + MockServerTransportListener serverTransportListener + = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + StreamCreation serverStreamCreation + = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + ServerStream serverStream = serverStreamCreation.stream; + if (!haveTransportTracer()) { + return; + } + + TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get(); + assertEquals(0, before.streamsSucceeded); + + serverStream.close(Status.OK, new Metadata()); + // Block until the close actually happened before verifying stats + serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get(); + assertEquals(1, after.streamsSucceeded); + assertEquals(0, after.streamsFailed); + + } + + @Test + public void transportTracer_streamEnded_nonOk() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + clientStream.start(clientStreamListener); + MockServerTransportListener serverTransportListener + = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + StreamCreation serverStreamCreation + = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + ServerStream serverStream = serverStreamCreation.stream; + if (!haveTransportTracer()) { + return; + } + + TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get(); + assertEquals(0, before.streamsFailed); + + serverStream.close(Status.UNKNOWN, new Metadata()); + // Block until the close actually happened before verifying stats + serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + + TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get(); + assertEquals(1, after.streamsFailed); + assertEquals(0, after.streamsSucceeded); + } + + @Test + public void transportTracer_receive_msg() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + clientStream.start(clientStreamListener); + MockServerTransportListener serverTransportListener + = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + StreamCreation serverStreamCreation + = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + ServerStream serverStream = serverStreamCreation.stream; + ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; + if (!haveTransportTracer()) { + return; + } + + TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get(); + assertEquals(0, before.messagesReceived); + assertEquals(0, before.lastMessageReceivedTimeNanos); + + serverStream.request(1); + clientStream.writeMessage(methodDescriptor.streamRequest("request")); + clientStream.flush(); + clientStream.halfClose(); + verifyMessageCountAndClose(serverStreamListener.messageQueue, 1); + + TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get(); + assertEquals(1, after.messagesReceived); + long timestamp = TimeUnit.NANOSECONDS.toMillis(after.lastMessageReceivedTimeNanos); + assertThat(System.currentTimeMillis() - timestamp).isAtMost(50L); + + serverStream.close(Status.OK, new Metadata()); + } + + @Test + public void transportTracer_send_msg() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + clientStream.start(clientStreamListener); + MockServerTransportListener serverTransportListener + = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + StreamCreation serverStreamCreation + = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + ServerStream serverStream = serverStreamCreation.stream; + if (!haveTransportTracer()) { + return; + } + + TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get(); + assertEquals(0, before.messagesSent); + assertEquals(0, before.lastMessageSentTimeNanos); + + clientStream.request(1); + serverStream.writeHeaders(new Metadata()); + serverStream.writeMessage(methodDescriptor.streamResponse("response")); + serverStream.flush(); + verifyMessageCountAndClose(clientStreamListener.messageQueue, 1); + + TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get(); + assertEquals(1, after.messagesSent); + long timestamp = TimeUnit.NANOSECONDS.toMillis(after.lastMessageSentTimeNanos); + assertThat(System.currentTimeMillis() - timestamp).isAtMost(50L); + + + serverStream.close(Status.OK, new Metadata()); + } + /** * Helper that simply does an RPC. It can be used similar to a sleep for negative testing: to give * time for actions _not_ to happen. Since it is based on doing an actual RPC with actual |