aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzpencer <spencerfang@google.com>2018-03-23 15:44:40 -0700
committerGitHub <noreply@github.com>2018-03-23 15:44:40 -0700
commita5b55bb24c413cef87b135647ea44cf6527a692c (patch)
tree9814315d66509bc6c286c581bf2d9052095bac8a
parentae42d666add7a92073850fbed0bd629e767c12e1 (diff)
downloadgrpc-grpc-java-a5b55bb24c413cef87b135647ea44cf6527a692c.tar.gz
netty,okhttp,testing: always set TRANSPORT_ATTR_REMOTE_ADDR (#4217)
Always set the remote address, no reason why this should be a TLS-only feature. This is needed for channelz, and is especially useful in unit tests where we are using plaintext. This PR adds the attr for plaintext.
-rw-r--r--netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java26
-rw-r--r--okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java7
-rw-r--r--okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java13
-rw-r--r--testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java80
4 files changed, 80 insertions, 46 deletions
diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
index c45a6ac6b..398ef8838 100644
--- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
+++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
@@ -329,7 +329,7 @@ public final class ProtocolNegotiators {
HttpClientCodec httpClientCodec = new HttpClientCodec();
final HttpClientUpgradeHandler upgrader =
new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
- return new BufferingHttp2UpgradeHandler(upgrader);
+ return new BufferingHttp2UpgradeHandler(upgrader, handler);
}
}
@@ -662,8 +662,11 @@ public final class ProtocolNegotiators {
private static class BufferUntilChannelActiveHandler extends AbstractBufferingHandler
implements ProtocolNegotiator.Handler {
- BufferUntilChannelActiveHandler(ChannelHandler... handlers) {
- super(handlers);
+ private final GrpcHttp2ConnectionHandler handler;
+
+ BufferUntilChannelActiveHandler(GrpcHttp2ConnectionHandler handler) {
+ super(handler);
+ this.handler = handler;
}
@Override
@@ -679,6 +682,11 @@ public final class ProtocolNegotiators {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
writeBufferedAndRemove(ctx);
+ handler.handleProtocolNegotiationCompleted(
+ Attributes
+ .newBuilder()
+ .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
+ .build());
super.channelActive(ctx);
}
}
@@ -689,8 +697,11 @@ public final class ProtocolNegotiators {
private static class BufferingHttp2UpgradeHandler extends AbstractBufferingHandler
implements ProtocolNegotiator.Handler {
- BufferingHttp2UpgradeHandler(ChannelHandler... handlers) {
- super(handlers);
+ private final GrpcHttp2ConnectionHandler grpcHandler;
+
+ BufferingHttp2UpgradeHandler(ChannelHandler handler, GrpcHttp2ConnectionHandler grpcHandler) {
+ super(handler);
+ this.grpcHandler = grpcHandler;
}
@Override
@@ -712,6 +723,11 @@ public final class ProtocolNegotiators {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
writeBufferedAndRemove(ctx);
+ grpcHandler.handleProtocolNegotiationCompleted(
+ Attributes
+ .newBuilder()
+ .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
+ .build());
} else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
fail(ctx, unavailableException("HTTP/2 upgrade rejected"));
}
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
index 7a5ee95da..0ea8a89f0 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
@@ -58,6 +58,7 @@ class OkHttpClientStream extends AbstractClientStream {
private volatile int id = ABSENT_ID;
private final TransportState state;
private final Sink sink = new Sink();
+ private final Attributes attributes;
private boolean useGet = false;
@@ -83,6 +84,10 @@ class OkHttpClientStream extends AbstractClientStream {
this.method = method;
this.authority = authority;
this.userAgent = userAgent;
+ // OkHttpClientStream is only created after the transport has finished connecting,
+ // so it is safe to read the transport attributes.
+ // We make a copy here for convenience, even though we can ask the transport.
+ this.attributes = transport.getAttributes();
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, frameWriter, outboundFlow,
transport);
}
@@ -123,7 +128,7 @@ class OkHttpClientStream extends AbstractClientStream {
@Override
public Attributes getAttributes() {
- return Attributes.EMPTY;
+ return attributes;
}
class Sink implements AbstractClientStream.Sink {
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
index a1fc9fa1c..f710399c8 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
@@ -31,6 +31,7 @@ import com.squareup.okhttp.Request;
import com.squareup.okhttp.internal.http.StatusLine;
import io.grpc.Attributes;
import io.grpc.CallOptions;
+import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
@@ -149,6 +150,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
private final int maxMessageSize;
private int connectionUnacknowledgedBytesRead;
private ClientFrameHandler clientFrameHandler;
+ // Caution: Not synchronized, new value can only be safely read after the connection is complete.
+ private Attributes attributes = Attributes.EMPTY;
/**
* Indicates the transport is in go-away state: no new streams will be processed, but existing
* streams may continue.
@@ -467,6 +470,12 @@ class OkHttpClientTransport implements ConnectionClientTransport {
sock.setTcpNoDelay(true);
source = Okio.buffer(Okio.source(sock));
sink = Okio.buffer(Okio.sink(sock));
+ // TODO(zhangkun83): fill channel security attributes
+ // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
+ attributes = Attributes
+ .newBuilder()
+ .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
+ .build();
} catch (StatusException e) {
startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
return;
@@ -482,6 +491,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
synchronized (lock) {
socket = sock;
maxConcurrentStreams = Integer.MAX_VALUE;
+
startPendingStreams();
}
@@ -675,8 +685,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
@Override
public Attributes getAttributes() {
- // TODO(zhangkun83): fill channel security attributes
- return Attributes.EMPTY;
+ return attributes;
}
/**
diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
index 430a9c8b1..778475ac2 100644
--- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
+++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java
@@ -68,8 +68,6 @@ import io.grpc.internal.ServerTransportListener;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
@@ -231,7 +229,7 @@ public abstract class AbstractTransportTest {
public void frameAfterRstStreamShouldNotBreakClientChannel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -298,7 +296,7 @@ public abstract class AbstractTransportTest {
server.start(serverListener);
client = newClientTransport(server);
InOrder inOrder = inOrder(mockClientTransportListener);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
client.shutdown(shutdownReason);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
@@ -312,7 +310,7 @@ public abstract class AbstractTransportTest {
server.start(serverListener);
client = newClientTransport(server);
InOrder inOrder = inOrder(mockClientTransportListener);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
@@ -340,7 +338,7 @@ public abstract class AbstractTransportTest {
public void openStreamPreventsTermination() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -392,7 +390,7 @@ public abstract class AbstractTransportTest {
public void shutdownNowKillsClientStream() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -429,7 +427,7 @@ public abstract class AbstractTransportTest {
public void shutdownNowKillsServerStream() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -470,7 +468,7 @@ public abstract class AbstractTransportTest {
public void ping() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class);
try {
client.ping(mockPingCallback, MoreExecutors.directExecutor());
@@ -486,7 +484,7 @@ public abstract class AbstractTransportTest {
public void ping_duringShutdown() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
// Stream prevents termination
ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
@@ -508,7 +506,7 @@ public abstract class AbstractTransportTest {
public void ping_afterTermination() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
client.shutdown(shutdownReason);
@@ -530,7 +528,7 @@ public abstract class AbstractTransportTest {
InOrder inOrder = inOrder(clientStreamTracerFactory);
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
// Stream prevents termination
ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
inOrder.verify(clientStreamTracerFactory).newClientStreamTracer(
@@ -570,7 +568,7 @@ public abstract class AbstractTransportTest {
// dealing with afterTermination is harder than duringShutdown.
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
client.shutdown(shutdownReason);
@@ -594,7 +592,7 @@ public abstract class AbstractTransportTest {
public void transportInUse_normalClose() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase();
stream1.start(clientStreamListener1);
@@ -624,7 +622,7 @@ public abstract class AbstractTransportTest {
public void transportInUse_clientCancel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase();
stream1.start(clientStreamListener1);
@@ -649,7 +647,7 @@ public abstract class AbstractTransportTest {
InOrder serverInOrder = inOrder(serverStreamTracerFactory);
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -804,7 +802,7 @@ public abstract class AbstractTransportTest {
public void authorityPropagation() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
@@ -823,7 +821,7 @@ public abstract class AbstractTransportTest {
public void zeroMessageStream() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -859,7 +857,7 @@ public abstract class AbstractTransportTest {
public void earlyServerClose_withServerHeaders() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -894,7 +892,7 @@ public abstract class AbstractTransportTest {
public void earlyServerClose_noServerHeaders() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -938,7 +936,7 @@ public abstract class AbstractTransportTest {
public void earlyServerClose_serverFailure() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -1016,7 +1014,7 @@ public abstract class AbstractTransportTest {
public void clientCancel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -1047,7 +1045,7 @@ public abstract class AbstractTransportTest {
public void clientCancelFromWithinMessageRead() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -1134,7 +1132,7 @@ public abstract class AbstractTransportTest {
public void serverCancel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -1182,7 +1180,7 @@ public abstract class AbstractTransportTest {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener =
serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -1343,7 +1341,7 @@ public abstract class AbstractTransportTest {
public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -1375,7 +1373,7 @@ public abstract class AbstractTransportTest {
public void interactionsAfterClientStreamCancelAreNoops() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@@ -1411,7 +1409,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_streamStarted() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (!haveTransportTracer()) {
@@ -1496,7 +1494,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_server_streamEnded_ok() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@@ -1535,7 +1533,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_server_streamEnded_nonOk() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@@ -1575,7 +1573,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_client_streamEnded_nonOk() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@@ -1610,7 +1608,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_server_receive_msg() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@@ -1655,7 +1653,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_server_send_msg() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
- runIfNotNull(client.start(mockClientTransportListener));
+ startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@@ -1700,7 +1698,7 @@ public abstract class AbstractTransportTest {
public void socketStats_addresses() throws Exception {
server.start(serverListener);
ManagedClientTransport client = newClientTransport(server);
- runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
+ startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@@ -1711,9 +1709,7 @@ public abstract class AbstractTransportTest {
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
- // clients do not have TRANSPORT_ATTR_REMOTE_ADDR so use a hack for serverAddress
- SocketAddress serverAddress
- = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), server.getPort());
+ SocketAddress serverAddress = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
SocketStats clientSocketStats = client.getStats().get();
@@ -1733,7 +1729,8 @@ public abstract class AbstractTransportTest {
*/
private void doPingPong(MockServerListener serverListener) throws Exception {
ManagedClientTransport client = newClientTransport(server);
- runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
+ ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
+ startTransport(client, listener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@@ -1798,6 +1795,13 @@ public abstract class AbstractTransportTest {
}
}
+ private static void startTransport(
+ ManagedClientTransport clientTransport,
+ ManagedClientTransport.Listener listener) {
+ runIfNotNull(clientTransport.start(listener));
+ verify(listener, timeout(100)).transportReady();
+ }
+
private static class MockServerListener implements ServerListener {
public final BlockingQueue<MockServerTransportListener> listeners
= new LinkedBlockingQueue<MockServerTransportListener>();