aboutsummaryrefslogtreecommitdiff
path: root/testing
diff options
context:
space:
mode:
authorzpencer <spencerfang@google.com>2017-11-02 15:44:21 -0700
committerGitHub <noreply@github.com>2017-11-02 15:44:21 -0700
commit2162cd07d5debe833bfdfe3512f30ed085140b91 (patch)
tree231d858d2eda19e3d21cf0e4c2dd18013e73e8d2 /testing
parent4c96ebd6d410923d78ab110c36bf8b3b1402126c (diff)
downloadgrpc-grpc-java-2162cd07d5debe833bfdfe3512f30ed085140b91.tar.gz
netty,core: add a TransportTracer class (#3454)
Diffstat (limited to 'testing')
-rw-r--r--testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java192
1 files changed, 192 insertions, 0 deletions
diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
index b784aad4c..b08d6e07a 100644
--- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
+++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
@@ -62,6 +62,7 @@ import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
+import io.grpc.internal.TransportTracer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -1370,6 +1371,197 @@ public abstract class AbstractTransportTest {
doPingPong(serverListener);
}
+ // Not all transports support the tracer yet
+ protected boolean haveTransportTracer() {
+ return false;
+ }
+
+ @Test
+ public void transportTracer_streamStarted() throws Exception {
+ server.start(serverListener);
+ client = newClientTransport(server);
+ runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
+ MockServerTransportListener serverTransportListener
+ = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ if (!haveTransportTracer()) {
+ return;
+ }
+
+ // start first stream
+ long firstTimestamp;
+ {
+ TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(0, before.streamsStarted);
+ assertEquals(0, before.lastStreamCreatedTimeNanos);
+
+ ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
+ ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
+ clientStream.start(clientStreamListener);
+ StreamCreation serverStreamCreation = serverTransportListener
+ .takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(1, after.streamsStarted);
+ firstTimestamp = TimeUnit.NANOSECONDS.toMillis(after.lastStreamCreatedTimeNanos);
+ assertThat(System.currentTimeMillis() - firstTimestamp).isAtMost(50L);
+
+ ServerStream serverStream = serverStreamCreation.stream;
+ serverStream.close(Status.OK, new Metadata());
+ }
+
+ // start second stream
+ {
+ TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(1, before.streamsStarted);
+
+ ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
+ ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
+ clientStream.start(clientStreamListener);
+ StreamCreation serverStreamCreation = serverTransportListener
+ .takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(2, after.streamsStarted);
+ assertTrue(after.lastStreamCreatedTimeNanos > firstTimestamp);
+ long secondTimestamp = TimeUnit.NANOSECONDS.toMillis(after.lastStreamCreatedTimeNanos);
+ assertThat(System.currentTimeMillis() - secondTimestamp).isAtMost(50L);
+
+ ServerStream serverStream = serverStreamCreation.stream;
+ serverStream.close(Status.OK, new Metadata());
+ }
+ }
+
+ @Test
+ public void transportTracer_streamEnded_ok() throws Exception {
+ server.start(serverListener);
+ client = newClientTransport(server);
+ runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
+ ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
+ ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
+ clientStream.start(clientStreamListener);
+ MockServerTransportListener serverTransportListener
+ = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ StreamCreation serverStreamCreation
+ = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ ServerStream serverStream = serverStreamCreation.stream;
+ if (!haveTransportTracer()) {
+ return;
+ }
+
+ TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(0, before.streamsSucceeded);
+
+ serverStream.close(Status.OK, new Metadata());
+ // Block until the close actually happened before verifying stats
+ serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(1, after.streamsSucceeded);
+ assertEquals(0, after.streamsFailed);
+
+ }
+
+ @Test
+ public void transportTracer_streamEnded_nonOk() throws Exception {
+ server.start(serverListener);
+ client = newClientTransport(server);
+ runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
+ ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
+ ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
+ clientStream.start(clientStreamListener);
+ MockServerTransportListener serverTransportListener
+ = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ StreamCreation serverStreamCreation
+ = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ ServerStream serverStream = serverStreamCreation.stream;
+ if (!haveTransportTracer()) {
+ return;
+ }
+
+ TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(0, before.streamsFailed);
+
+ serverStream.close(Status.UNKNOWN, new Metadata());
+ // Block until the close actually happened before verifying stats
+ serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(1, after.streamsFailed);
+ assertEquals(0, after.streamsSucceeded);
+ }
+
+ @Test
+ public void transportTracer_receive_msg() throws Exception {
+ server.start(serverListener);
+ client = newClientTransport(server);
+ runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
+ ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
+ ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
+ clientStream.start(clientStreamListener);
+ MockServerTransportListener serverTransportListener
+ = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ StreamCreation serverStreamCreation
+ = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ ServerStream serverStream = serverStreamCreation.stream;
+ ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
+ if (!haveTransportTracer()) {
+ return;
+ }
+
+ TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(0, before.messagesReceived);
+ assertEquals(0, before.lastMessageReceivedTimeNanos);
+
+ serverStream.request(1);
+ clientStream.writeMessage(methodDescriptor.streamRequest("request"));
+ clientStream.flush();
+ clientStream.halfClose();
+ verifyMessageCountAndClose(serverStreamListener.messageQueue, 1);
+
+ TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(1, after.messagesReceived);
+ long timestamp = TimeUnit.NANOSECONDS.toMillis(after.lastMessageReceivedTimeNanos);
+ assertThat(System.currentTimeMillis() - timestamp).isAtMost(50L);
+
+ serverStream.close(Status.OK, new Metadata());
+ }
+
+ @Test
+ public void transportTracer_send_msg() throws Exception {
+ server.start(serverListener);
+ client = newClientTransport(server);
+ runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
+ ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
+ ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
+ clientStream.start(clientStreamListener);
+ MockServerTransportListener serverTransportListener
+ = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ StreamCreation serverStreamCreation
+ = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ ServerStream serverStream = serverStreamCreation.stream;
+ if (!haveTransportTracer()) {
+ return;
+ }
+
+ TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(0, before.messagesSent);
+ assertEquals(0, before.lastMessageSentTimeNanos);
+
+ clientStream.request(1);
+ serverStream.writeHeaders(new Metadata());
+ serverStream.writeMessage(methodDescriptor.streamResponse("response"));
+ serverStream.flush();
+ verifyMessageCountAndClose(clientStreamListener.messageQueue, 1);
+
+ TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get();
+ assertEquals(1, after.messagesSent);
+ long timestamp = TimeUnit.NANOSECONDS.toMillis(after.lastMessageSentTimeNanos);
+ assertThat(System.currentTimeMillis() - timestamp).isAtMost(50L);
+
+
+ serverStream.close(Status.OK, new Metadata());
+ }
+
/**
* Helper that simply does an RPC. It can be used similar to a sleep for negative testing: to give
* time for actions _not_ to happen. Since it is based on doing an actual RPC with actual