diff options
author | zpencer <spencerfang@google.com> | 2018-03-23 15:44:40 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-23 15:44:40 -0700 |
commit | a5b55bb24c413cef87b135647ea44cf6527a692c (patch) | |
tree | 9814315d66509bc6c286c581bf2d9052095bac8a | |
parent | ae42d666add7a92073850fbed0bd629e767c12e1 (diff) | |
download | grpc-grpc-java-a5b55bb24c413cef87b135647ea44cf6527a692c.tar.gz |
netty,okhttp,testing: always set TRANSPORT_ATTR_REMOTE_ADDR (#4217)
Always set the remote address, no reason why this should be a TLS-only
feature. This is needed for channelz, and is especially useful in unit
tests where we are using plaintext.
This PR adds the attr for plaintext.
4 files changed, 80 insertions, 46 deletions
diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java index c45a6ac6b..398ef8838 100644 --- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java +++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java @@ -329,7 +329,7 @@ public final class ProtocolNegotiators { HttpClientCodec httpClientCodec = new HttpClientCodec(); final HttpClientUpgradeHandler upgrader = new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000); - return new BufferingHttp2UpgradeHandler(upgrader); + return new BufferingHttp2UpgradeHandler(upgrader, handler); } } @@ -662,8 +662,11 @@ public final class ProtocolNegotiators { private static class BufferUntilChannelActiveHandler extends AbstractBufferingHandler implements ProtocolNegotiator.Handler { - BufferUntilChannelActiveHandler(ChannelHandler... handlers) { - super(handlers); + private final GrpcHttp2ConnectionHandler handler; + + BufferUntilChannelActiveHandler(GrpcHttp2ConnectionHandler handler) { + super(handler); + this.handler = handler; } @Override @@ -679,6 +682,11 @@ public final class ProtocolNegotiators { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { writeBufferedAndRemove(ctx); + handler.handleProtocolNegotiationCompleted( + Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) + .build()); super.channelActive(ctx); } } @@ -689,8 +697,11 @@ public final class ProtocolNegotiators { private static class BufferingHttp2UpgradeHandler extends AbstractBufferingHandler implements ProtocolNegotiator.Handler { - BufferingHttp2UpgradeHandler(ChannelHandler... handlers) { - super(handlers); + private final GrpcHttp2ConnectionHandler grpcHandler; + + BufferingHttp2UpgradeHandler(ChannelHandler handler, GrpcHttp2ConnectionHandler grpcHandler) { + super(handler); + this.grpcHandler = grpcHandler; } @Override @@ -712,6 +723,11 @@ public final class ProtocolNegotiators { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) { writeBufferedAndRemove(ctx); + grpcHandler.handleProtocolNegotiationCompleted( + Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) + .build()); } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) { fail(ctx, unavailableException("HTTP/2 upgrade rejected")); } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 7a5ee95da..0ea8a89f0 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -58,6 +58,7 @@ class OkHttpClientStream extends AbstractClientStream { private volatile int id = ABSENT_ID; private final TransportState state; private final Sink sink = new Sink(); + private final Attributes attributes; private boolean useGet = false; @@ -83,6 +84,10 @@ class OkHttpClientStream extends AbstractClientStream { this.method = method; this.authority = authority; this.userAgent = userAgent; + // OkHttpClientStream is only created after the transport has finished connecting, + // so it is safe to read the transport attributes. + // We make a copy here for convenience, even though we can ask the transport. + this.attributes = transport.getAttributes(); this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, frameWriter, outboundFlow, transport); } @@ -123,7 +128,7 @@ class OkHttpClientStream extends AbstractClientStream { @Override public Attributes getAttributes() { - return Attributes.EMPTY; + return attributes; } class Sink implements AbstractClientStream.Sink { diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index a1fc9fa1c..f710399c8 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -31,6 +31,7 @@ import com.squareup.okhttp.Request; import com.squareup.okhttp.internal.http.StatusLine; import io.grpc.Attributes; import io.grpc.CallOptions; +import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; @@ -149,6 +150,8 @@ class OkHttpClientTransport implements ConnectionClientTransport { private final int maxMessageSize; private int connectionUnacknowledgedBytesRead; private ClientFrameHandler clientFrameHandler; + // Caution: Not synchronized, new value can only be safely read after the connection is complete. + private Attributes attributes = Attributes.EMPTY; /** * Indicates the transport is in go-away state: no new streams will be processed, but existing * streams may continue. @@ -467,6 +470,12 @@ class OkHttpClientTransport implements ConnectionClientTransport { sock.setTcpNoDelay(true); source = Okio.buffer(Okio.source(sock)); sink = Okio.buffer(Okio.sink(sock)); + // TODO(zhangkun83): fill channel security attributes + // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info + attributes = Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress()) + .build(); } catch (StatusException e) { startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus()); return; @@ -482,6 +491,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { synchronized (lock) { socket = sock; maxConcurrentStreams = Integer.MAX_VALUE; + startPendingStreams(); } @@ -675,8 +685,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { @Override public Attributes getAttributes() { - // TODO(zhangkun83): fill channel security attributes - return Attributes.EMPTY; + return attributes; } /** diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 430a9c8b1..778475ac2 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -68,8 +68,6 @@ import io.grpc.internal.ServerTransportListener; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Arrays; import java.util.List; @@ -231,7 +229,7 @@ public abstract class AbstractTransportTest { public void frameAfterRstStreamShouldNotBreakClientChannel() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -298,7 +296,7 @@ public abstract class AbstractTransportTest { server.start(serverListener); client = newClientTransport(server); InOrder inOrder = inOrder(mockClientTransportListener); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); client.shutdown(shutdownReason); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated(); @@ -312,7 +310,7 @@ public abstract class AbstractTransportTest { server.start(serverListener); client = newClientTransport(server); InOrder inOrder = inOrder(mockClientTransportListener); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -340,7 +338,7 @@ public abstract class AbstractTransportTest { public void openStreamPreventsTermination() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -392,7 +390,7 @@ public abstract class AbstractTransportTest { public void shutdownNowKillsClientStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -429,7 +427,7 @@ public abstract class AbstractTransportTest { public void shutdownNowKillsServerStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -470,7 +468,7 @@ public abstract class AbstractTransportTest { public void ping() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class); try { client.ping(mockPingCallback, MoreExecutors.directExecutor()); @@ -486,7 +484,7 @@ public abstract class AbstractTransportTest { public void ping_duringShutdown() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); // Stream prevents termination ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); @@ -508,7 +506,7 @@ public abstract class AbstractTransportTest { public void ping_afterTermination() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); client.shutdown(shutdownReason); @@ -530,7 +528,7 @@ public abstract class AbstractTransportTest { InOrder inOrder = inOrder(clientStreamTracerFactory); server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); // Stream prevents termination ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions); inOrder.verify(clientStreamTracerFactory).newClientStreamTracer( @@ -570,7 +568,7 @@ public abstract class AbstractTransportTest { // dealing with afterTermination is harder than duringShutdown. server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady(); Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called"); client.shutdown(shutdownReason); @@ -594,7 +592,7 @@ public abstract class AbstractTransportTest { public void transportInUse_normalClose() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase(); stream1.start(clientStreamListener1); @@ -624,7 +622,7 @@ public abstract class AbstractTransportTest { public void transportInUse_clientCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase(); stream1.start(clientStreamListener1); @@ -649,7 +647,7 @@ public abstract class AbstractTransportTest { InOrder serverInOrder = inOrder(serverStreamTracerFactory); server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -804,7 +802,7 @@ public abstract class AbstractTransportTest { public void authorityPropagation() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -823,7 +821,7 @@ public abstract class AbstractTransportTest { public void zeroMessageStream() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -859,7 +857,7 @@ public abstract class AbstractTransportTest { public void earlyServerClose_withServerHeaders() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -894,7 +892,7 @@ public abstract class AbstractTransportTest { public void earlyServerClose_noServerHeaders() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -938,7 +936,7 @@ public abstract class AbstractTransportTest { public void earlyServerClose_serverFailure() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1016,7 +1014,7 @@ public abstract class AbstractTransportTest { public void clientCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1047,7 +1045,7 @@ public abstract class AbstractTransportTest { public void clientCancelFromWithinMessageRead() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1134,7 +1132,7 @@ public abstract class AbstractTransportTest { public void serverCancel() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1182,7 +1180,7 @@ public abstract class AbstractTransportTest { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1343,7 +1341,7 @@ public abstract class AbstractTransportTest { public void interactionsAfterServerStreamCloseAreNoops() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1375,7 +1373,7 @@ public abstract class AbstractTransportTest { public void interactionsAfterClientStreamCancelAreNoops() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; @@ -1411,7 +1409,7 @@ public abstract class AbstractTransportTest { public void transportTracer_streamStarted() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); MockServerTransportListener serverTransportListener = serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); if (!haveTransportTracer()) { @@ -1496,7 +1494,7 @@ public abstract class AbstractTransportTest { public void transportTracer_server_streamEnded_ok() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1535,7 +1533,7 @@ public abstract class AbstractTransportTest { public void transportTracer_server_streamEnded_nonOk() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1575,7 +1573,7 @@ public abstract class AbstractTransportTest { public void transportTracer_client_streamEnded_nonOk() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1610,7 +1608,7 @@ public abstract class AbstractTransportTest { public void transportTracer_server_receive_msg() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1655,7 +1653,7 @@ public abstract class AbstractTransportTest { public void transportTracer_server_send_msg() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mockClientTransportListener)); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1700,7 +1698,7 @@ public abstract class AbstractTransportTest { public void socketStats_addresses() throws Exception { server.start(serverListener); ManagedClientTransport client = newClientTransport(server); - runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + startTransport(client, mockClientTransportListener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1711,9 +1709,7 @@ public abstract class AbstractTransportTest { = serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); ServerStream serverStream = serverStreamCreation.stream; - // clients do not have TRANSPORT_ATTR_REMOTE_ADDR so use a hack for serverAddress - SocketAddress serverAddress - = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), server.getPort()); + SocketAddress serverAddress = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); SocketStats clientSocketStats = client.getStats().get(); @@ -1733,7 +1729,8 @@ public abstract class AbstractTransportTest { */ private void doPingPong(MockServerListener serverListener) throws Exception { ManagedClientTransport client = newClientTransport(server); - runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); + startTransport(client, listener); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1798,6 +1795,13 @@ public abstract class AbstractTransportTest { } } + private static void startTransport( + ManagedClientTransport clientTransport, + ManagedClientTransport.Listener listener) { + runIfNotNull(clientTransport.start(listener)); + verify(listener, timeout(100)).transportReady(); + } + private static class MockServerListener implements ServerListener { public final BlockingQueue<MockServerTransportListener> listeners = new LinkedBlockingQueue<MockServerTransportListener>(); |