diff options
author | zpencer <spencerfang@google.com> | 2017-11-10 15:55:21 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-10 15:55:21 -0800 |
commit | d0a84ae5b86e6b7f031c9829a48d1e56361b145f (patch) | |
tree | bf5a2ec4a8473cd1642a18dfae27da2c68d81ee7 /netty | |
parent | 30fb84479048110bf53c571c3b77899a308ed9ff (diff) | |
download | grpc-grpc-java-d0a84ae5b86e6b7f031c9829a48d1e56361b145f.tar.gz |
core,netty: wire TransportTracer to netty client (#3705)
Diffstat (limited to 'netty')
7 files changed, 217 insertions, 93 deletions
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index aea4643ed..f680cd8e9 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -31,6 +31,7 @@ import io.grpc.internal.ClientTransport.PingCallback; import io.grpc.internal.GrpcUtil; import io.grpc.internal.Http2Ping; import io.grpc.internal.KeepAliveManager; +import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; @@ -52,6 +53,7 @@ import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FlowController; import io.netty.handler.codec.http2.Http2FrameAdapter; import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2FrameReader; @@ -97,6 +99,7 @@ class NettyClientHandler extends AbstractNettyHandler { private final KeepAliveManager keepAliveManager; // Returns new unstarted stopwatches private final Supplier<Stopwatch> stopwatchFactory; + private final TransportTracer transportTracer; private WriteQueue clientWriteQueue; private Http2Ping ping; private Attributes attributes = Attributes.EMPTY; @@ -107,7 +110,8 @@ class NettyClientHandler extends AbstractNettyHandler { int flowControlWindow, int maxHeaderListSize, Supplier<Stopwatch> stopwatchFactory, - Runnable tooManyPingsRunnable) { + Runnable tooManyPingsRunnable, + TransportTracer transportTracer) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); @@ -128,12 +132,13 @@ class NettyClientHandler extends AbstractNettyHandler { flowControlWindow, maxHeaderListSize, stopwatchFactory, - tooManyPingsRunnable); + tooManyPingsRunnable, + transportTracer); } @VisibleForTesting static NettyClientHandler newHandler( - Http2Connection connection, + final Http2Connection connection, Http2FrameReader frameReader, Http2FrameWriter frameWriter, ClientTransportLifecycleManager lifecycleManager, @@ -141,7 +146,8 @@ class NettyClientHandler extends AbstractNettyHandler { int flowControlWindow, int maxHeaderListSize, Supplier<Stopwatch> stopwatchFactory, - Runnable tooManyPingsRunnable) { + Runnable tooManyPingsRunnable, + TransportTracer transportTracer) { Preconditions.checkNotNull(connection, "connection"); Preconditions.checkNotNull(frameReader, "frameReader"); Preconditions.checkNotNull(lifecycleManager, "lifecycleManager"); @@ -164,6 +170,18 @@ class NettyClientHandler extends AbstractNettyHandler { Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader); + transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() { + final Http2FlowController local = connection.local().flowController(); + final Http2FlowController remote = connection.remote().flowController(); + + @Override + public TransportTracer.FlowControlWindows read() { + return new TransportTracer.FlowControlWindows( + local.windowSize(connection.connectionStream()), + remote.windowSize(connection.connectionStream())); + } + }); + Http2Settings settings = new Http2Settings(); settings.pushEnabled(false); settings.initialWindowSize(flowControlWindow); @@ -177,7 +195,8 @@ class NettyClientHandler extends AbstractNettyHandler { lifecycleManager, keepAliveManager, stopwatchFactory, - tooManyPingsRunnable); + tooManyPingsRunnable, + transportTracer); } private NettyClientHandler( @@ -187,11 +206,13 @@ class NettyClientHandler extends AbstractNettyHandler { ClientTransportLifecycleManager lifecycleManager, KeepAliveManager keepAliveManager, Supplier<Stopwatch> stopwatchFactory, - final Runnable tooManyPingsRunnable) { + final Runnable tooManyPingsRunnable, + TransportTracer transportTracer) { super(decoder, encoder, settings); this.lifecycleManager = lifecycleManager; this.keepAliveManager = keepAliveManager; this.stopwatchFactory = stopwatchFactory; + this.transportTracer = Preconditions.checkNotNull(transportTracer); // Set the frame listener on the decoder. decoder().frameListener(new FrameListener()); @@ -550,7 +571,9 @@ class NettyClientHandler extends AbstractNettyHandler { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { + if (future.isSuccess()) { + transportTracer.reportKeepAliveSent(); + } else { Throwable cause = future.cause(); if (cause instanceof ClosedChannelException) { cause = lifecycleManager.getShutdownThrowable(); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 76eba4414..b97270ad7 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -32,6 +32,7 @@ import io.grpc.Status; import io.grpc.internal.AbstractClientStream; import io.grpc.internal.Http2ClientStreamTransportState; import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.TransportTracer; import io.grpc.internal.WritableBuffer; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; @@ -61,12 +62,19 @@ class NettyClientStream extends AbstractClientStream { private final AsciiString userAgent; NettyClientStream( - TransportState state, MethodDescriptor<?, ?> method, Metadata headers, - Channel channel, AsciiString authority, AsciiString scheme, AsciiString userAgent, - StatsTraceContext statsTraceCtx) { - super(new NettyWritableBufferAllocator(channel.alloc()), + TransportState state, + MethodDescriptor<?, ?> method, + Metadata headers, + Channel channel, + AsciiString authority, + AsciiString scheme, + AsciiString userAgent, + StatsTraceContext statsTraceCtx, + TransportTracer transportTracer) { + super( + new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx, - null, + transportTracer, headers, useGet(method)); this.state = checkNotNull(state, "transportState"); @@ -149,7 +157,7 @@ class NettyClientStream extends AbstractClientStream { @Override public void writeFrame( - WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { + WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) { Preconditions.checkArgument(numMessages >= 0); ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf(); final int numBytes = bytebuf.readableBytes(); @@ -167,6 +175,7 @@ class NettyClientStream extends AbstractClientStream { // Remove the bytes from outbound flow control, optionally notifying // the client that they can send more bytes. transportState().onSentBytes(numBytes); + NettyClientStream.this.getTransportTracer().reportMessageSent(numMessages); } } }), flush); @@ -205,9 +214,13 @@ class NettyClientStream extends AbstractClientStream { private int id; private Http2Stream http2Stream; - public TransportState(NettyClientHandler handler, EventLoop eventLoop, int maxMessageSize, - StatsTraceContext statsTraceCtx) { - super(maxMessageSize, statsTraceCtx, /*transportTracer=*/null); + public TransportState( + NettyClientHandler handler, + EventLoop eventLoop, + int maxMessageSize, + StatsTraceContext statsTraceCtx, + TransportTracer transportTracer) { + super(maxMessageSize, statsTraceCtx, transportTracer); this.handler = checkNotNull(handler, "handler"); this.eventLoop = checkNotNull(eventLoop, "eventLoop"); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 0d84a92af..b977a0f70 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -50,6 +50,7 @@ import io.netty.util.AsciiString; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Future; import javax.annotation.Nullable; @@ -84,6 +85,8 @@ class NettyClientTransport implements ConnectionClientTransport { private Status statusExplainingWhyTheChannelIsNull; /** Since not thread-safe, may only be used from event loop. */ private ClientTransportLifecycleManager lifecycleManager; + /** Since not thread-safe, may only be used from event loop. */ + private final TransportTracer transportTracer = new TransportTracer(); NettyClientTransport( SocketAddress address, Class<? extends Channel> channelType, @@ -146,15 +149,25 @@ class NettyClientTransport implements ConnectionClientTransport { } StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers); return new NettyClientStream( - new NettyClientStream.TransportState(handler, channel.eventLoop(), maxMessageSize, - statsTraceCtx) { + new NettyClientStream.TransportState( + handler, + channel.eventLoop(), + maxMessageSize, + statsTraceCtx, + transportTracer) { @Override protected Status statusFromFailedFuture(ChannelFuture f) { return NettyClientTransport.this.statusFromFailedFuture(f); } }, - method, headers, channel, authority, negotiationHandler.scheme(), userAgent, - statsTraceCtx); + method, + headers, + channel, + authority, + negotiationHandler.scheme(), + userAgent, + statsTraceCtx, + transportTracer); } @SuppressWarnings("unchecked") @@ -175,7 +188,8 @@ class NettyClientTransport implements ConnectionClientTransport { flowControlWindow, maxHeaderListSize, GrpcUtil.STOPWATCH_SUPPLIER, - tooManyPingsRunnable); + tooManyPingsRunnable, + transportTracer); NettyHandlerSettings.setAutoWindow(handler); negotiationHandler = negotiator.newHandler(handler); @@ -295,9 +309,20 @@ class NettyClientTransport implements ConnectionClientTransport { @Override public Future<TransportTracer.Stats> getTransportStats() { - SettableFuture<TransportTracer.Stats> ret = SettableFuture.create(); - ret.set(null); - return ret; + if (channel.eventLoop().inEventLoop()) { + // This is necessary, otherwise we will block forever if we get the future from inside + // the event loop. + SettableFuture<TransportTracer.Stats> result = SettableFuture.create(); + result.set(transportTracer.getStats()); + return result; + } + return channel.eventLoop().submit( + new Callable<TransportTracer.Stats>() { + @Override + public TransportTracer.Stats call() throws Exception { + return transportTracer.getStats(); + } + }); } @VisibleForTesting diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index b3f97bdba..a3097ad40 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -26,7 +26,6 @@ import static io.grpc.netty.Utils.STATUS_OK; import static io.grpc.netty.Utils.TE_HEADER; import static io.grpc.netty.Utils.TE_TRAILERS; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; -import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -59,6 +58,7 @@ import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; +import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; @@ -104,7 +104,6 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand private NettyClientStream.TransportState streamTransportState; private Http2Headers grpcHeaders; private long nanoTime; // backs a ticker, for testing ping round-trip time measurement - private int flowControlWindow = DEFAULT_WINDOW_SIZE; private int maxHeaderListSize = Integer.MAX_VALUE; private int streamId = 3; private ClientTransportLifecycleManager lifecycleManager; @@ -125,6 +124,11 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand private final Queue<InputStream> streamListenerMessageQueue = new LinkedList<InputStream>(); + @Override + protected void manualSetUp() throws Exception { + setUp(); + } + /** * Set up for test. */ @@ -156,8 +160,11 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand } initChannel(new GrpcHttp2ClientHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE)); - streamTransportState = new TransportStateImpl(handler(), channel().eventLoop(), - DEFAULT_MAX_MESSAGE_SIZE); + streamTransportState = new TransportStateImpl( + handler(), + channel().eventLoop(), + DEFAULT_MAX_MESSAGE_SIZE, + transportTracer); streamTransportState.setListener(streamListener); grpcHeaders = new DefaultHttp2Headers() @@ -462,14 +469,20 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState)); assertEquals(3, streamTransportState.id()); - streamTransportState = new TransportStateImpl(handler(), channel().eventLoop(), - DEFAULT_MAX_MESSAGE_SIZE); + streamTransportState = new TransportStateImpl( + handler(), + channel().eventLoop(), + DEFAULT_MAX_MESSAGE_SIZE, + transportTracer); streamTransportState.setListener(streamListener); enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState)); assertEquals(5, streamTransportState.id()); - streamTransportState = new TransportStateImpl(handler(), channel().eventLoop(), - DEFAULT_MAX_MESSAGE_SIZE); + streamTransportState = new TransportStateImpl( + handler(), + channel().eventLoop(), + DEFAULT_MAX_MESSAGE_SIZE, + transportTracer); streamTransportState.setListener(streamListener); enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState)); assertEquals(7, streamTransportState.id()); @@ -501,10 +514,13 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand @Test public void ping() throws Exception { PingCallbackImpl callback1 = new PingCallbackImpl(); + assertEquals(0, transportTracer.getStats().keepAlivesSent); sendPing(callback1); + assertEquals(1, transportTracer.getStats().keepAlivesSent); // add'l ping will be added as listener to outstanding operation PingCallbackImpl callback2 = new PingCallbackImpl(); sendPing(callback2); + assertEquals(1, transportTracer.getStats().keepAlivesSent); ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class); verifyWrite().writePing(eq(ctx()), eq(false), captor.capture(), @@ -534,7 +550,9 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand // now that previous ping is done, next request starts a new operation callback1 = new PingCallbackImpl(); + assertEquals(1, transportTracer.getStats().keepAlivesSent); sendPing(callback1); + assertEquals(2, transportTracer.getStats().keepAlivesSent); assertEquals(0, callback1.invocationCount); } @@ -550,6 +568,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand assertTrue(callback.failureCause instanceof StatusException); assertEquals(Status.Code.UNAVAILABLE, ((StatusException) callback.failureCause).getStatus().getCode()); + // A failed ping is still counted + assertEquals(1, transportTracer.getStats().keepAlivesSent); } @Test @@ -558,7 +578,9 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand handler().setAutoTuneFlowControl(true); PingCallbackImpl callback = new PingCallbackImpl(); + assertEquals(0, transportTracer.getStats().keepAlivesSent); sendPing(callback); + assertEquals(1, transportTracer.getStats().keepAlivesSent); ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class); verifyWrite().writePing(eq(ctx()), eq(false), captor.capture(), any(ChannelPromise.class)); ByteBuf payload = captor.getValue(); @@ -575,6 +597,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand assertEquals(1, handler().flowControlPing().getPingReturn()); assertEquals(1, callback.invocationCount); + assertEquals(1, transportTracer.getStats().keepAlivesSent); } @Override @@ -658,7 +681,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand flowControlWindow, maxHeaderListSize, stopwatchSupplier, - tooManyPingsRunnable); + tooManyPingsRunnable, + transportTracer); } @Override @@ -690,8 +714,12 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand } private static class TransportStateImpl extends NettyClientStream.TransportState { - public TransportStateImpl(NettyClientHandler handler, EventLoop eventLoop, int maxMessageSize) { - super(handler, eventLoop, maxMessageSize, StatsTraceContext.NOOP); + public TransportStateImpl( + NettyClientHandler handler, + EventLoop eventLoop, + int maxMessageSize, + TransportTracer transportTracer) { + super(handler, eventLoop, maxMessageSize, StatsTraceContext.NOOP, transportTracer); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java index a868efd7f..13bec7336 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java @@ -51,6 +51,7 @@ import io.grpc.internal.ClientStreamListener; import io.grpc.internal.GrpcUtil; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; +import io.grpc.internal.TransportTracer; import io.grpc.netty.WriteQueue.QueuedCommand; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -98,6 +99,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream .setResponseMarshaller(marshaller) .build(); + private final TransportTracer transportTracer = new TransportTracer(); /** Set up for test. */ @Before @@ -388,8 +390,14 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream listener = mock(ClientStreamListener.class); stream = new NettyClientStream(new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), - methodDescriptor, new Metadata(), channel, AsciiString.of("localhost"), - AsciiString.of("http"), AsciiString.of("agent"), StatsTraceContext.NOOP); + methodDescriptor, + new Metadata(), + channel, + AsciiString.of("localhost"), + AsciiString.of("http"), + AsciiString.of("agent"), + StatsTraceContext.NOOP, + transportTracer); stream.start(listener); stream().transportState().setId(STREAM_ID); verify(listener, never()).onReady(); @@ -407,9 +415,16 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream Mockito.reset(writeQueue); when(writeQueue.enqueue(any(QueuedCommand.class), any(boolean.class))).thenReturn(future); - stream = new NettyClientStream(new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), - methodDescriptor, new Metadata(), channel, AsciiString.of("localhost"), - AsciiString.of("http"), AsciiString.of("good agent"), StatsTraceContext.NOOP); + stream = new NettyClientStream( + new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), + methodDescriptor, + new Metadata(), + channel, + AsciiString.of("localhost"), + AsciiString.of("http"), + AsciiString.of("good agent"), + StatsTraceContext.NOOP, + transportTracer); stream.start(listener); ArgumentCaptor<CreateStreamCommand> cmdCap = ArgumentCaptor.forClass(CreateStreamCommand.class); @@ -430,9 +445,15 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream .setSafe(true) .build(); NettyClientStream stream = new NettyClientStream( - new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), descriptor, new Metadata(), - channel, AsciiString.of("localhost"), AsciiString.of("http"), AsciiString.of("agent"), - StatsTraceContext.NOOP); + new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), + descriptor, + new Metadata(), + channel, + AsciiString.of("localhost"), + AsciiString.of("http"), + AsciiString.of("agent"), + StatsTraceContext.NOOP, + transportTracer); stream.start(listener); stream.transportState().setId(STREAM_ID); stream.transportState().setHttp2Stream(http2Stream); @@ -468,9 +489,15 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream }).when(writeQueue).enqueue(any(QueuedCommand.class), any(ChannelPromise.class), anyBoolean()); when(writeQueue.enqueue(any(QueuedCommand.class), anyBoolean())).thenReturn(future); NettyClientStream stream = new NettyClientStream( - new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), methodDescriptor, new Metadata(), - channel, AsciiString.of("localhost"), AsciiString.of("http"), AsciiString.of("agent"), - StatsTraceContext.NOOP); + new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), + methodDescriptor, + new Metadata(), + channel, + AsciiString.of("localhost"), + AsciiString.of("http"), + AsciiString.of("agent"), + StatsTraceContext.NOOP, + transportTracer); stream.start(listener); stream.transportState().setId(STREAM_ID); stream.transportState().setHttp2Stream(http2Stream); @@ -508,7 +535,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream private class TransportStateImpl extends NettyClientStream.TransportState { public TransportStateImpl(NettyClientHandler handler, int maxMessageSize) { - super(handler, channel.eventLoop(), maxMessageSize, StatsTraceContext.NOOP); + super(handler, channel.eventLoop(), maxMessageSize, StatsTraceContext.NOOP, transportTracer); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java index 5bf5cbe40..4556eb566 100644 --- a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java +++ b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java @@ -17,6 +17,7 @@ package io.grpc.netty; import static com.google.common.base.Charsets.UTF_8; +import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.junit.Assert.assertEquals; import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.Matchers.any; @@ -48,6 +49,7 @@ import io.netty.channel.EventLoop; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; +import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2Exception; @@ -97,6 +99,9 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> { */ protected void manualSetUp() throws Exception {} + protected final TransportTracer transportTracer = new TransportTracer(); + protected int flowControlWindow = DEFAULT_WINDOW_SIZE; + private final FakeClock fakeClock = new FakeClock(); FakeClock fakeClock() { @@ -451,4 +456,53 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> { assertEquals(maxWindow, localFlowController.initialWindowSize(connectionStream)); } + @Test + public void transportTracer_windowSizeDefault() throws Exception { + manualSetUp(); + TransportTracer.Stats stats = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow); + assertEquals(flowControlWindow, stats.localFlowControlWindow); + } + + @Test + public void transportTracer_windowSize() throws Exception { + flowControlWindow = 1024 * 1024; + manualSetUp(); + TransportTracer.Stats stats = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow); + assertEquals(flowControlWindow, stats.localFlowControlWindow); + } + + @Test + public void transportTracer_windowUpdate_remote() throws Exception { + manualSetUp(); + TransportTracer.Stats before = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.localFlowControlWindow); + + ByteBuf serializedSettings = windowUpdate(0, 1000); + channelRead(serializedSettings); + TransportTracer.Stats after = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE + 1000, + after.remoteFlowControlWindow); + assertEquals(flowControlWindow, after.localFlowControlWindow); + } + + @Test + public void transportTracer_windowUpdate_local() throws Exception { + manualSetUp(); + TransportTracer.Stats before = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow); + assertEquals(flowControlWindow, before.localFlowControlWindow); + + // If the window size is below a certain threshold, netty will wait to apply the update. + // Use a large increment to be sure that it exceeds the threshold. + connection().local().flowController().incrementWindowSize( + connection().connectionStream(), 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE); + + TransportTracer.Stats after = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, after.remoteFlowControlWindow); + assertEquals(flowControlWindow + 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE, + connection().local().flowController().windowSize(connection().connectionStream())); + } } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index cd8ee55e6..acf2710db 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -29,7 +29,6 @@ import static io.grpc.netty.Utils.HTTP_METHOD; import static io.grpc.netty.Utils.TE_HEADER; import static io.grpc.netty.Utils.TE_TRAILERS; import static io.netty.buffer.Unpooled.directBuffer; -import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -66,7 +65,6 @@ import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; -import io.grpc.internal.TransportTracer; import io.grpc.internal.testing.TestServerStreamTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; import io.netty.buffer.ByteBuf; @@ -126,7 +124,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand final Queue<InputStream> streamListenerMessageQueue = new LinkedList<InputStream>(); - private int flowControlWindow = DEFAULT_WINDOW_SIZE; private int maxConcurrentStreams = Integer.MAX_VALUE; private int maxHeaderListSize = Integer.MAX_VALUE; private boolean permitKeepAliveWithoutCalls = true; @@ -136,7 +133,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand private long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS; private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS; - private TransportTracer transportTracer = new TransportTracer(); private class ServerTransportListenerImpl implements ServerTransportListener { @@ -766,48 +762,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand assertTrue(!channel().isOpen()); } - @Test - public void transportTracer_windowSizeDefault() throws Exception { - manualSetUp(); - TransportTracer.Stats stats = transportTracer.getStats(); - assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow); - assertEquals(flowControlWindow, stats.localFlowControlWindow); - } - - @Test - public void transportTracer_windowUpdate_remote() throws Exception { - flowControlWindow = 1048576; // 1MiB - manualSetUp(); - TransportTracer.Stats before = transportTracer.getStats(); - assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow); - assertEquals(flowControlWindow, before.localFlowControlWindow); - - ByteBuf serializedSettings = windowUpdate(0, 1000); - channelRead(serializedSettings); - TransportTracer.Stats after = transportTracer.getStats(); - assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE + 1000, - after.remoteFlowControlWindow); - assertEquals(flowControlWindow, after.localFlowControlWindow); - } - - @Test - public void transportTracer_windowUpdate_local() throws Exception { - manualSetUp(); - TransportTracer.Stats before = transportTracer.getStats(); - assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow); - assertEquals(flowControlWindow, before.localFlowControlWindow); - - // If the window size is below a certain threshold, netty will wait to apply the update. - // Use a large increment to be sure that it exceeds the threshold. - connection().local().flowController().incrementWindowSize( - connection().connectionStream(), 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE); - - TransportTracer.Stats after = transportTracer.getStats(); - assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, after.remoteFlowControlWindow); - assertEquals(flowControlWindow + 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE, - after.localFlowControlWindow); - } - private void createStream() throws Exception { Http2Headers headers = new DefaultHttp2Headers() .method(HTTP_METHOD) |