diff options
author | zpencer <spencerfang@google.com> | 2017-11-02 15:44:21 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-02 15:44:21 -0700 |
commit | 2162cd07d5debe833bfdfe3512f30ed085140b91 (patch) | |
tree | 231d858d2eda19e3d21cf0e4c2dd18013e73e8d2 /netty | |
parent | 4c96ebd6d410923d78ab110c36bf8b3b1402126c (diff) | |
download | grpc-grpc-java-2162cd07d5debe833bfdfe3512f30ed085140b91.tar.gz |
netty,core: add a TransportTracer class (#3454)
Diffstat (limited to 'netty')
8 files changed, 209 insertions, 42 deletions
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index f85e6b702..79b102362 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -40,6 +40,7 @@ import io.grpc.internal.KeepAliveManager; import io.grpc.internal.LogExceptionRunnable; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; @@ -62,6 +63,7 @@ import io.netty.handler.codec.http2.Http2ConnectionEncoder; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Exception.StreamException; +import io.netty.handler.codec.http2.Http2FlowController; import io.netty.handler.codec.http2.Http2FrameAdapter; import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2FrameReader; @@ -102,6 +104,7 @@ class NettyServerHandler extends AbstractNettyHandler { private final long maxConnectionAgeInNanos; private final long maxConnectionAgeGraceInNanos; private final List<ServerStreamTracer.Factory> streamTracerFactories; + private final TransportTracer transportTracer; private final KeepAliveEnforcer keepAliveEnforcer; private Attributes attributes; private Throwable connectionError; @@ -118,6 +121,7 @@ class NettyServerHandler extends AbstractNettyHandler { static NettyServerHandler newHandler( ServerTransportListener transportListener, List<ServerStreamTracer.Factory> streamTracerFactories, + TransportTracer transportTracer, int maxStreams, int flowControlWindow, int maxHeaderListSize, @@ -137,12 +141,22 @@ class NettyServerHandler extends AbstractNettyHandler { Http2FrameWriter frameWriter = new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger); return newHandler( - frameReader, frameWriter, transportListener, streamTracerFactories, - maxStreams, flowControlWindow, maxHeaderListSize, maxMessageSize, - keepAliveTimeInNanos, keepAliveTimeoutInNanos, + frameReader, + frameWriter, + transportListener, + streamTracerFactories, + transportTracer, + maxStreams, + flowControlWindow, + maxHeaderListSize, + maxMessageSize, + keepAliveTimeInNanos, + keepAliveTimeoutInNanos, maxConnectionIdleInNanos, - maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, - permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); + maxConnectionAgeInNanos, + maxConnectionAgeGraceInNanos, + permitKeepAliveWithoutCalls, + permitKeepAliveTimeInNanos); } @VisibleForTesting @@ -150,6 +164,7 @@ class NettyServerHandler extends AbstractNettyHandler { Http2FrameReader frameReader, Http2FrameWriter frameWriter, ServerTransportListener transportListener, List<ServerStreamTracer.Factory> streamTracerFactories, + TransportTracer transportTracer, int maxStreams, int flowControlWindow, int maxHeaderListSize, @@ -178,7 +193,6 @@ class NettyServerHandler extends AbstractNettyHandler { // Create the local flow controller configured to auto-refill the connection window. connection.local().flowController( new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); - frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer); Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter); Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, @@ -193,6 +207,7 @@ class NettyServerHandler extends AbstractNettyHandler { connection, transportListener, streamTracerFactories, + transportTracer, decoder, encoder, settings, maxMessageSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, @@ -205,8 +220,10 @@ class NettyServerHandler extends AbstractNettyHandler { final Http2Connection connection, ServerTransportListener transportListener, List<ServerStreamTracer.Factory> streamTracerFactories, + TransportTracer transportTracer, Http2ConnectionDecoder decoder, - Http2ConnectionEncoder encoder, Http2Settings settings, + Http2ConnectionEncoder encoder, + Http2Settings settings, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, @@ -273,6 +290,7 @@ class NettyServerHandler extends AbstractNettyHandler { streamKey = encoder.connection().newKey(); this.transportListener = checkNotNull(transportListener, "transportListener"); this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); + this.transportTracer = checkNotNull(transportTracer, "transportTracer"); // Set the frame listener on the decoder. decoder().frameListener(new FrameListener()); @@ -328,6 +346,25 @@ class NettyServerHandler extends AbstractNettyHandler { keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */); keepAliveManager.onTransportStarted(); } + + + if (transportTracer != null) { + assert encoder().connection().equals(decoder().connection()); + final Http2Connection connection = encoder().connection(); + transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() { + private final Http2FlowController local = connection.local().flowController(); + private final Http2FlowController remote = connection.remote().flowController(); + + @Override + public TransportTracer.FlowControlWindows read() { + assert ctx.executor().inEventLoop(); + return new TransportTracer.FlowControlWindows( + local.windowSize(connection.connectionStream()), + remote.windowSize(connection.connectionStream())); + } + }); + } + super.handlerAdded(ctx); } @@ -354,10 +391,20 @@ class NettyServerHandler extends AbstractNettyHandler { StatsTraceContext.newServerContext(streamTracerFactories, method, metadata); NettyServerStream.TransportState state = new NettyServerStream.TransportState( - this, ctx.channel().eventLoop(), http2Stream, maxMessageSize, statsTraceCtx); + this, + ctx.channel().eventLoop(), + http2Stream, + maxMessageSize, + statsTraceCtx, + transportTracer); String authority = getOrUpdateAuthority((AsciiString)headers.authority()); - NettyServerStream stream = new NettyServerStream(ctx.channel(), state, attributes, - authority, statsTraceCtx); + NettyServerStream stream = new NettyServerStream( + ctx.channel(), + state, + attributes, + authority, + statsTraceCtx, + transportTracer); transportListener.streamCreated(stream, method, metadata); state.onStreamAllocated(); http2Stream.setProperty(streamKey, state); @@ -555,6 +602,7 @@ class NettyServerHandler extends AbstractNettyHandler { } if (cmd.endOfStream()) { closeStreamWhenDone(promise, streamId); + transportTracer.reportStreamClosed(cmd.status()); } encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise); } @@ -712,8 +760,19 @@ class NettyServerHandler extends AbstractNettyHandler { @Override public void ping() { - encoder().writePing(ctx, false /* isAck */, KEEPALIVE_PING_BUF, ctx.newPromise()); + ChannelFuture pingFuture = encoder().writePing( + ctx, false /* isAck */, KEEPALIVE_PING_BUF, ctx.newPromise()); ctx.flush(); + if (transportTracer != null) { + pingFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + transportTracer.reportKeepAliveSent(); + } + } + }); + } } @Override diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 869a38d42..37c91a50a 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -23,6 +23,7 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.internal.AbstractServerStream; import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.TransportTracer; import io.grpc.internal.WritableBuffer; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; @@ -48,9 +49,14 @@ class NettyServerStream extends AbstractServerStream { private final Attributes attributes; private final String authority; - public NettyServerStream(Channel channel, TransportState state, Attributes transportAttrs, - String authority, StatsTraceContext statsTraceCtx) { - super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx); + public NettyServerStream( + Channel channel, + TransportState state, + Attributes transportAttrs, + String authority, + StatsTraceContext statsTraceCtx, + TransportTracer transportTracer) { + super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx, transportTracer); this.state = checkNotNull(state, "transportState"); this.channel = checkNotNull(channel, "channel"); this.writeQueue = state.handler.getWriteQueue(); @@ -96,8 +102,10 @@ class NettyServerStream extends AbstractServerStream { @Override public void writeHeaders(Metadata headers) { - writeQueue.enqueue(new SendResponseHeadersCommand(transportState(), - Utils.convertServerHeaders(headers), false), + writeQueue.enqueue( + SendResponseHeadersCommand.createHeaders( + transportState(), + Utils.convertServerHeaders(headers)), true); } @@ -124,10 +132,11 @@ class NettyServerStream extends AbstractServerStream { } @Override - public void writeTrailers(Metadata trailers, boolean headersSent) { + public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); writeQueue.enqueue( - new SendResponseHeadersCommand(transportState(), http2Trailers, true), true); + SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status), + true); } @Override @@ -143,9 +152,14 @@ class NettyServerStream extends AbstractServerStream { private final NettyServerHandler handler; private final EventLoop eventLoop; - public TransportState(NettyServerHandler handler, EventLoop eventLoop, Http2Stream http2Stream, - int maxMessageSize, StatsTraceContext statsTraceCtx) { - super(maxMessageSize, statsTraceCtx); + public TransportState( + NettyServerHandler handler, + EventLoop eventLoop, + Http2Stream http2Stream, + int maxMessageSize, + StatsTraceContext statsTraceCtx, + TransportTracer transportTracer) { + super(maxMessageSize, statsTraceCtx, transportTracer); this.http2Stream = checkNotNull(http2Stream, "http2Stream"); this.handler = checkNotNull(handler, "handler"); this.eventLoop = eventLoop; diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index afb124c82..0e70349bc 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -19,17 +19,21 @@ package io.grpc.netty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.SettableFuture; import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.LogId; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.TransportTracer; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import java.io.IOException; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -65,6 +69,7 @@ class NettyServerTransport implements ServerTransport { private final boolean permitKeepAliveWithoutCalls; private final long permitKeepAliveTimeInNanos; private final List<ServerStreamTracer.Factory> streamTracerFactories; + private final TransportTracer transportTracer; NettyServerTransport( Channel channel, ProtocolNegotiator protocolNegotiator, @@ -89,6 +94,7 @@ class NettyServerTransport implements ServerTransport { this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos; this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls; this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; + this.transportTracer = new TransportTracer(); } public void start(ServerTransportListener listener) { @@ -168,12 +174,30 @@ class NettyServerTransport implements ServerTransport { } } + @Override + public Future<TransportTracer.Stats> getTransportStats() { + if (channel.eventLoop().inEventLoop()) { + // This is necessary, otherwise we will block forever if we get the future from inside + // the event loop. + SettableFuture<TransportTracer.Stats> result = SettableFuture.create(); + result.set(transportTracer.getStats()); + return result; + } + return channel.eventLoop().submit( + new Callable<TransportTracer.Stats>() { + @Override + public TransportTracer.Stats call() throws Exception { + return transportTracer.getStats(); + } + }); + } + /** * Creates the Netty handler to be used in the channel pipeline. */ private NettyServerHandler createHandler(ServerTransportListener transportListener) { return NettyServerHandler.newHandler( - transportListener, streamTracerFactories, maxStreams, + transportListener, streamTracerFactories, transportTracer, maxStreams, flowControlWindow, maxHeaderListSize, maxMessageSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxConnectionIdleInNanos, diff --git a/netty/src/main/java/io/grpc/netty/SendResponseHeadersCommand.java b/netty/src/main/java/io/grpc/netty/SendResponseHeadersCommand.java index f969e0390..38536d693 100644 --- a/netty/src/main/java/io/grpc/netty/SendResponseHeadersCommand.java +++ b/netty/src/main/java/io/grpc/netty/SendResponseHeadersCommand.java @@ -16,7 +16,9 @@ package io.grpc.netty; +import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import io.grpc.Status; import io.netty.handler.codec.http2.Http2Headers; /** @@ -25,12 +27,22 @@ import io.netty.handler.codec.http2.Http2Headers; class SendResponseHeadersCommand extends WriteQueue.AbstractQueuedCommand { private final StreamIdHolder stream; private final Http2Headers headers; - private final boolean endOfStream; + private final Status status; - SendResponseHeadersCommand(StreamIdHolder stream, Http2Headers headers, boolean endOfStream) { + private SendResponseHeadersCommand(StreamIdHolder stream, Http2Headers headers, Status status) { this.stream = Preconditions.checkNotNull(stream, "stream"); this.headers = Preconditions.checkNotNull(headers, "headers"); - this.endOfStream = endOfStream; + this.status = status; + } + + static SendResponseHeadersCommand createHeaders(StreamIdHolder stream, Http2Headers headers) { + return new SendResponseHeadersCommand(stream, headers, null); + } + + static SendResponseHeadersCommand createTrailers( + StreamIdHolder stream, Http2Headers headers, Status status) { + return new SendResponseHeadersCommand( + stream, headers, Preconditions.checkNotNull(status, "status")); } StreamIdHolder stream() { @@ -42,7 +54,11 @@ class SendResponseHeadersCommand extends WriteQueue.AbstractQueuedCommand { } boolean endOfStream() { - return endOfStream; + return status != null; + } + + Status status() { + return status; } @Override @@ -53,17 +69,17 @@ class SendResponseHeadersCommand extends WriteQueue.AbstractQueuedCommand { SendResponseHeadersCommand thatCmd = (SendResponseHeadersCommand) that; return thatCmd.stream.equals(stream) && thatCmd.headers.equals(headers) - && thatCmd.endOfStream == endOfStream; + && thatCmd.status.equals(status); } @Override public String toString() { return getClass().getSimpleName() + "(stream=" + stream.id() + ", headers=" + headers - + ", endOfStream=" + endOfStream + ")"; + + ", status=" + status + ")"; } @Override public int hashCode() { - return stream.hashCode(); + return Objects.hashCode(stream, status); } } diff --git a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java index 98c166144..d0bbd993c 100644 --- a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java +++ b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java @@ -32,6 +32,7 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.grpc.internal.FakeClock; import io.grpc.internal.MessageFramer; import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.TransportTracer; import io.grpc.internal.WritableBuffer; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -239,15 +240,20 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> { protected ByteBuf grpcDataFrame(int streamId, boolean endStream, byte[] content) { final ByteBuf compressionFrame = Unpooled.buffer(content.length); - MessageFramer framer = new MessageFramer(new MessageFramer.Sink() { - @Override - public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { - if (frame != null) { - ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf(); - compressionFrame.writeBytes(bytebuf); - } - } - }, new NettyWritableBufferAllocator(ByteBufAllocator.DEFAULT), StatsTraceContext.NOOP); + TransportTracer noTransportTracer = null; + MessageFramer framer = new MessageFramer( + new MessageFramer.Sink() { + @Override + public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + if (frame != null) { + ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf(); + compressionFrame.writeBytes(bytebuf); + } + } + }, + new NettyWritableBufferAllocator(ByteBufAllocator.DEFAULT), + StatsTraceContext.NOOP, + noTransportTracer); framer.writePayload(new ByteArrayInputStream(content)); framer.flush(); ChannelHandlerContext ctx = newMockContext(); @@ -302,6 +308,12 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> { return captureWrite(ctx); } + protected final ByteBuf windowUpdate(int streamId, int delta) { + ChannelHandlerContext ctx = newMockContext(); + new DefaultHttp2FrameWriter().writeWindowUpdate(ctx, 0, delta, newPromise()); + return captureWrite(ctx); + } + protected final ChannelPromise newPromise() { return channel.newPromise(); } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 8854961de..008ba23ef 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -66,6 +66,7 @@ import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; +import io.grpc.internal.TransportTracer; import io.grpc.internal.testing.TestServerStreamTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; import io.netty.buffer.ByteBuf; @@ -135,6 +136,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand private long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; + private TransportTracer transportTracer; private class ServerTransportListenerImpl implements ServerTransportListener { @@ -158,6 +160,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand assertNull("manualSetUp should not run more than once", handler()); MockitoAnnotations.initMocks(this); + transportTracer = new TransportTracer(); when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class))) .thenReturn(streamTracer); @@ -495,7 +498,9 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand keepAliveTimeoutInNanos = TimeUnit.MINUTES.toNanos(30L); manualSetUp(); + assertEquals(0, transportTracer.getStats().keepAlivesSent); fakeClock().forwardNanos(keepAliveTimeInNanos); + assertEquals(1, transportTracer.getStats().keepAlivesSent); verifyWrite().writePing(eq(ctx()), eq(false), eq(pingBuf), any(ChannelPromise.class)); @@ -548,7 +553,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand createStream(); Http2Headers headers = Utils.convertServerHeaders(new Metadata()); ChannelFuture future = enqueue( - new SendResponseHeadersCommand(stream.transportState(), headers, false)); + SendResponseHeadersCommand.createHeaders(stream.transportState(), headers)); future.get(); ByteBuf payload = handler().ctx().alloc().buffer(8); payload.writeLong(1); @@ -745,6 +750,34 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand assertTrue(!channel().isOpen()); } + @Test + public void transportTracer_windowSizeDefault() throws Exception { + manualSetUp(); + TransportTracer.Stats stats = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow); + assertEquals(flowControlWindow, stats.localFlowControlWindow); + } + + @Test + public void transportTracer_windowSize() throws Exception { + flowControlWindow = 1048576; // 1MiB + manualSetUp(); + { + TransportTracer.Stats stats = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow); + assertEquals(flowControlWindow, stats.localFlowControlWindow); + } + + { + ByteBuf serializedSettings = windowUpdate(0, 1000); + channelRead(serializedSettings); + TransportTracer.Stats stats = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE + 1000, + stats.remoteFlowControlWindow); + assertEquals(flowControlWindow, stats.localFlowControlWindow); + } + } + private void createStream() throws Exception { Http2Headers headers = new DefaultHttp2Headers() .method(HTTP_METHOD) @@ -775,7 +808,8 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand protected NettyServerHandler newHandler() { return NettyServerHandler.newHandler( frameReader(), frameWriter(), transportListener, - Arrays.asList(streamTracerFactory), maxConcurrentStreams, flowControlWindow, + Arrays.asList(streamTracerFactory), transportTracer, + maxConcurrentStreams, flowControlWindow, maxHeaderListSize, DEFAULT_MAX_MESSAGE_SIZE, keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxConnectionIdleInNanos, diff --git a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java index de5ef8626..3818ec994 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java @@ -42,6 +42,7 @@ import io.grpc.Status; import io.grpc.internal.ServerStreamListener; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; +import io.grpc.internal.TransportTracer; import io.grpc.netty.WriteQueue.QueuedCommand; import io.netty.buffer.EmptyByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; @@ -298,10 +299,12 @@ public class NettyServerStreamTest extends NettyStreamTestBase<NettyServerStream }).when(writeQueue).enqueue(any(QueuedCommand.class), any(ChannelPromise.class), anyBoolean()); when(writeQueue.enqueue(any(QueuedCommand.class), anyBoolean())).thenReturn(future); StatsTraceContext statsTraceCtx = StatsTraceContext.NOOP; + TransportTracer transportTracer = new TransportTracer(); NettyServerStream.TransportState state = new NettyServerStream.TransportState( - handler, channel.eventLoop(), http2Stream, DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx); + handler, channel.eventLoop(), http2Stream, DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, + transportTracer); NettyServerStream stream = new NettyServerStream(channel, state, Attributes.EMPTY, - "test-authority", statsTraceCtx); + "test-authority", statsTraceCtx, transportTracer); stream.transportState().setListener(serverListener); state.onStreamAllocated(); verify(serverListener, atLeastOnce()).onReady(); diff --git a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java index 8d2a528a0..69afb11f1 100644 --- a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java @@ -42,6 +42,11 @@ public class NettyTransportTest extends AbstractTransportTest { .negotiationType(NegotiationType.PLAINTEXT) .buildTransportFactory(); + @Override + protected boolean haveTransportTracer() { + return true; + } + @After public void releaseClientFactory() { clientFactory.close(); |