aboutsummaryrefslogtreecommitdiff
path: root/netty
diff options
context:
space:
mode:
authorzpencer <spencerfang@google.com>2017-11-10 15:55:21 -0800
committerGitHub <noreply@github.com>2017-11-10 15:55:21 -0800
commitd0a84ae5b86e6b7f031c9829a48d1e56361b145f (patch)
treebf5a2ec4a8473cd1642a18dfae27da2c68d81ee7 /netty
parent30fb84479048110bf53c571c3b77899a308ed9ff (diff)
downloadgrpc-grpc-java-d0a84ae5b86e6b7f031c9829a48d1e56361b145f.tar.gz
core,netty: wire TransportTracer to netty client (#3705)
Diffstat (limited to 'netty')
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientHandler.java37
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientStream.java31
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientTransport.java41
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java50
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java51
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java54
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java46
7 files changed, 217 insertions, 93 deletions
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index aea4643ed..f680cd8e9 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -31,6 +31,7 @@ import io.grpc.internal.ClientTransport.PingCallback;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping;
import io.grpc.internal.KeepAliveManager;
+import io.grpc.internal.TransportTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
@@ -52,6 +53,7 @@ import io.netty.handler.codec.http2.Http2ConnectionAdapter;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FlowController;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
@@ -97,6 +99,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private final KeepAliveManager keepAliveManager;
// Returns new unstarted stopwatches
private final Supplier<Stopwatch> stopwatchFactory;
+ private final TransportTracer transportTracer;
private WriteQueue clientWriteQueue;
private Http2Ping ping;
private Attributes attributes = Attributes.EMPTY;
@@ -107,7 +110,8 @@ class NettyClientHandler extends AbstractNettyHandler {
int flowControlWindow,
int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory,
- Runnable tooManyPingsRunnable) {
+ Runnable tooManyPingsRunnable,
+ TransportTracer transportTracer) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
@@ -128,12 +132,13 @@ class NettyClientHandler extends AbstractNettyHandler {
flowControlWindow,
maxHeaderListSize,
stopwatchFactory,
- tooManyPingsRunnable);
+ tooManyPingsRunnable,
+ transportTracer);
}
@VisibleForTesting
static NettyClientHandler newHandler(
- Http2Connection connection,
+ final Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
ClientTransportLifecycleManager lifecycleManager,
@@ -141,7 +146,8 @@ class NettyClientHandler extends AbstractNettyHandler {
int flowControlWindow,
int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory,
- Runnable tooManyPingsRunnable) {
+ Runnable tooManyPingsRunnable,
+ TransportTracer transportTracer) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
@@ -164,6 +170,18 @@ class NettyClientHandler extends AbstractNettyHandler {
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
frameReader);
+ transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
+ final Http2FlowController local = connection.local().flowController();
+ final Http2FlowController remote = connection.remote().flowController();
+
+ @Override
+ public TransportTracer.FlowControlWindows read() {
+ return new TransportTracer.FlowControlWindows(
+ local.windowSize(connection.connectionStream()),
+ remote.windowSize(connection.connectionStream()));
+ }
+ });
+
Http2Settings settings = new Http2Settings();
settings.pushEnabled(false);
settings.initialWindowSize(flowControlWindow);
@@ -177,7 +195,8 @@ class NettyClientHandler extends AbstractNettyHandler {
lifecycleManager,
keepAliveManager,
stopwatchFactory,
- tooManyPingsRunnable);
+ tooManyPingsRunnable,
+ transportTracer);
}
private NettyClientHandler(
@@ -187,11 +206,13 @@ class NettyClientHandler extends AbstractNettyHandler {
ClientTransportLifecycleManager lifecycleManager,
KeepAliveManager keepAliveManager,
Supplier<Stopwatch> stopwatchFactory,
- final Runnable tooManyPingsRunnable) {
+ final Runnable tooManyPingsRunnable,
+ TransportTracer transportTracer) {
super(decoder, encoder, settings);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory;
+ this.transportTracer = Preconditions.checkNotNull(transportTracer);
// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
@@ -550,7 +571,9 @@ class NettyClientHandler extends AbstractNettyHandler {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
+ if (future.isSuccess()) {
+ transportTracer.reportKeepAliveSent();
+ } else {
Throwable cause = future.cause();
if (cause instanceof ClosedChannelException) {
cause = lifecycleManager.getShutdownThrowable();
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java
index 76eba4414..b97270ad7 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java
@@ -32,6 +32,7 @@ import io.grpc.Status;
import io.grpc.internal.AbstractClientStream;
import io.grpc.internal.Http2ClientStreamTransportState;
import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
@@ -61,12 +62,19 @@ class NettyClientStream extends AbstractClientStream {
private final AsciiString userAgent;
NettyClientStream(
- TransportState state, MethodDescriptor<?, ?> method, Metadata headers,
- Channel channel, AsciiString authority, AsciiString scheme, AsciiString userAgent,
- StatsTraceContext statsTraceCtx) {
- super(new NettyWritableBufferAllocator(channel.alloc()),
+ TransportState state,
+ MethodDescriptor<?, ?> method,
+ Metadata headers,
+ Channel channel,
+ AsciiString authority,
+ AsciiString scheme,
+ AsciiString userAgent,
+ StatsTraceContext statsTraceCtx,
+ TransportTracer transportTracer) {
+ super(
+ new NettyWritableBufferAllocator(channel.alloc()),
statsTraceCtx,
- null,
+ transportTracer,
headers,
useGet(method));
this.state = checkNotNull(state, "transportState");
@@ -149,7 +157,7 @@ class NettyClientStream extends AbstractClientStream {
@Override
public void writeFrame(
- WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
+ WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
Preconditions.checkArgument(numMessages >= 0);
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
final int numBytes = bytebuf.readableBytes();
@@ -167,6 +175,7 @@ class NettyClientStream extends AbstractClientStream {
// Remove the bytes from outbound flow control, optionally notifying
// the client that they can send more bytes.
transportState().onSentBytes(numBytes);
+ NettyClientStream.this.getTransportTracer().reportMessageSent(numMessages);
}
}
}), flush);
@@ -205,9 +214,13 @@ class NettyClientStream extends AbstractClientStream {
private int id;
private Http2Stream http2Stream;
- public TransportState(NettyClientHandler handler, EventLoop eventLoop, int maxMessageSize,
- StatsTraceContext statsTraceCtx) {
- super(maxMessageSize, statsTraceCtx, /*transportTracer=*/null);
+ public TransportState(
+ NettyClientHandler handler,
+ EventLoop eventLoop,
+ int maxMessageSize,
+ StatsTraceContext statsTraceCtx,
+ TransportTracer transportTracer) {
+ super(maxMessageSize, statsTraceCtx, transportTracer);
this.handler = checkNotNull(handler, "handler");
this.eventLoop = checkNotNull(eventLoop, "eventLoop");
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index 0d84a92af..b977a0f70 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -50,6 +50,7 @@ import io.netty.util.AsciiString;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
@@ -84,6 +85,8 @@ class NettyClientTransport implements ConnectionClientTransport {
private Status statusExplainingWhyTheChannelIsNull;
/** Since not thread-safe, may only be used from event loop. */
private ClientTransportLifecycleManager lifecycleManager;
+ /** Since not thread-safe, may only be used from event loop. */
+ private final TransportTracer transportTracer = new TransportTracer();
NettyClientTransport(
SocketAddress address, Class<? extends Channel> channelType,
@@ -146,15 +149,25 @@ class NettyClientTransport implements ConnectionClientTransport {
}
StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers);
return new NettyClientStream(
- new NettyClientStream.TransportState(handler, channel.eventLoop(), maxMessageSize,
- statsTraceCtx) {
+ new NettyClientStream.TransportState(
+ handler,
+ channel.eventLoop(),
+ maxMessageSize,
+ statsTraceCtx,
+ transportTracer) {
@Override
protected Status statusFromFailedFuture(ChannelFuture f) {
return NettyClientTransport.this.statusFromFailedFuture(f);
}
},
- method, headers, channel, authority, negotiationHandler.scheme(), userAgent,
- statsTraceCtx);
+ method,
+ headers,
+ channel,
+ authority,
+ negotiationHandler.scheme(),
+ userAgent,
+ statsTraceCtx,
+ transportTracer);
}
@SuppressWarnings("unchecked")
@@ -175,7 +188,8 @@ class NettyClientTransport implements ConnectionClientTransport {
flowControlWindow,
maxHeaderListSize,
GrpcUtil.STOPWATCH_SUPPLIER,
- tooManyPingsRunnable);
+ tooManyPingsRunnable,
+ transportTracer);
NettyHandlerSettings.setAutoWindow(handler);
negotiationHandler = negotiator.newHandler(handler);
@@ -295,9 +309,20 @@ class NettyClientTransport implements ConnectionClientTransport {
@Override
public Future<TransportTracer.Stats> getTransportStats() {
- SettableFuture<TransportTracer.Stats> ret = SettableFuture.create();
- ret.set(null);
- return ret;
+ if (channel.eventLoop().inEventLoop()) {
+ // This is necessary, otherwise we will block forever if we get the future from inside
+ // the event loop.
+ SettableFuture<TransportTracer.Stats> result = SettableFuture.create();
+ result.set(transportTracer.getStats());
+ return result;
+ }
+ return channel.eventLoop().submit(
+ new Callable<TransportTracer.Stats>() {
+ @Override
+ public TransportTracer.Stats call() throws Exception {
+ return transportTracer.getStats();
+ }
+ });
}
@VisibleForTesting
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
index b3f97bdba..a3097ad40 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
@@ -26,7 +26,6 @@ import static io.grpc.netty.Utils.STATUS_OK;
import static io.grpc.netty.Utils.TE_HEADER;
import static io.grpc.netty.Utils.TE_TRAILERS;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
-import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -59,6 +58,7 @@ import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.StreamListener;
+import io.grpc.internal.TransportTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
@@ -104,7 +104,6 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
private NettyClientStream.TransportState streamTransportState;
private Http2Headers grpcHeaders;
private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
- private int flowControlWindow = DEFAULT_WINDOW_SIZE;
private int maxHeaderListSize = Integer.MAX_VALUE;
private int streamId = 3;
private ClientTransportLifecycleManager lifecycleManager;
@@ -125,6 +124,11 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
private final Queue<InputStream> streamListenerMessageQueue = new LinkedList<InputStream>();
+ @Override
+ protected void manualSetUp() throws Exception {
+ setUp();
+ }
+
/**
* Set up for test.
*/
@@ -156,8 +160,11 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
}
initChannel(new GrpcHttp2ClientHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE));
- streamTransportState = new TransportStateImpl(handler(), channel().eventLoop(),
- DEFAULT_MAX_MESSAGE_SIZE);
+ streamTransportState = new TransportStateImpl(
+ handler(),
+ channel().eventLoop(),
+ DEFAULT_MAX_MESSAGE_SIZE,
+ transportTracer);
streamTransportState.setListener(streamListener);
grpcHeaders = new DefaultHttp2Headers()
@@ -462,14 +469,20 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
assertEquals(3, streamTransportState.id());
- streamTransportState = new TransportStateImpl(handler(), channel().eventLoop(),
- DEFAULT_MAX_MESSAGE_SIZE);
+ streamTransportState = new TransportStateImpl(
+ handler(),
+ channel().eventLoop(),
+ DEFAULT_MAX_MESSAGE_SIZE,
+ transportTracer);
streamTransportState.setListener(streamListener);
enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
assertEquals(5, streamTransportState.id());
- streamTransportState = new TransportStateImpl(handler(), channel().eventLoop(),
- DEFAULT_MAX_MESSAGE_SIZE);
+ streamTransportState = new TransportStateImpl(
+ handler(),
+ channel().eventLoop(),
+ DEFAULT_MAX_MESSAGE_SIZE,
+ transportTracer);
streamTransportState.setListener(streamListener);
enqueue(new CreateStreamCommand(grpcHeaders, streamTransportState));
assertEquals(7, streamTransportState.id());
@@ -501,10 +514,13 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
@Test
public void ping() throws Exception {
PingCallbackImpl callback1 = new PingCallbackImpl();
+ assertEquals(0, transportTracer.getStats().keepAlivesSent);
sendPing(callback1);
+ assertEquals(1, transportTracer.getStats().keepAlivesSent);
// add'l ping will be added as listener to outstanding operation
PingCallbackImpl callback2 = new PingCallbackImpl();
sendPing(callback2);
+ assertEquals(1, transportTracer.getStats().keepAlivesSent);
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
verifyWrite().writePing(eq(ctx()), eq(false), captor.capture(),
@@ -534,7 +550,9 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
// now that previous ping is done, next request starts a new operation
callback1 = new PingCallbackImpl();
+ assertEquals(1, transportTracer.getStats().keepAlivesSent);
sendPing(callback1);
+ assertEquals(2, transportTracer.getStats().keepAlivesSent);
assertEquals(0, callback1.invocationCount);
}
@@ -550,6 +568,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,
((StatusException) callback.failureCause).getStatus().getCode());
+ // A failed ping is still counted
+ assertEquals(1, transportTracer.getStats().keepAlivesSent);
}
@Test
@@ -558,7 +578,9 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
handler().setAutoTuneFlowControl(true);
PingCallbackImpl callback = new PingCallbackImpl();
+ assertEquals(0, transportTracer.getStats().keepAlivesSent);
sendPing(callback);
+ assertEquals(1, transportTracer.getStats().keepAlivesSent);
ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
verifyWrite().writePing(eq(ctx()), eq(false), captor.capture(), any(ChannelPromise.class));
ByteBuf payload = captor.getValue();
@@ -575,6 +597,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
assertEquals(1, handler().flowControlPing().getPingReturn());
assertEquals(1, callback.invocationCount);
+ assertEquals(1, transportTracer.getStats().keepAlivesSent);
}
@Override
@@ -658,7 +681,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
flowControlWindow,
maxHeaderListSize,
stopwatchSupplier,
- tooManyPingsRunnable);
+ tooManyPingsRunnable,
+ transportTracer);
}
@Override
@@ -690,8 +714,12 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
}
private static class TransportStateImpl extends NettyClientStream.TransportState {
- public TransportStateImpl(NettyClientHandler handler, EventLoop eventLoop, int maxMessageSize) {
- super(handler, eventLoop, maxMessageSize, StatsTraceContext.NOOP);
+ public TransportStateImpl(
+ NettyClientHandler handler,
+ EventLoop eventLoop,
+ int maxMessageSize,
+ TransportTracer transportTracer) {
+ super(handler, eventLoop, maxMessageSize, StatsTraceContext.NOOP, transportTracer);
}
@Override
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java
index a868efd7f..13bec7336 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java
@@ -51,6 +51,7 @@ import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.StreamListener;
+import io.grpc.internal.TransportTracer;
import io.grpc.netty.WriteQueue.QueuedCommand;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -98,6 +99,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
.setResponseMarshaller(marshaller)
.build();
+ private final TransportTracer transportTracer = new TransportTracer();
/** Set up for test. */
@Before
@@ -388,8 +390,14 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
listener = mock(ClientStreamListener.class);
stream = new NettyClientStream(new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE),
- methodDescriptor, new Metadata(), channel, AsciiString.of("localhost"),
- AsciiString.of("http"), AsciiString.of("agent"), StatsTraceContext.NOOP);
+ methodDescriptor,
+ new Metadata(),
+ channel,
+ AsciiString.of("localhost"),
+ AsciiString.of("http"),
+ AsciiString.of("agent"),
+ StatsTraceContext.NOOP,
+ transportTracer);
stream.start(listener);
stream().transportState().setId(STREAM_ID);
verify(listener, never()).onReady();
@@ -407,9 +415,16 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
Mockito.reset(writeQueue);
when(writeQueue.enqueue(any(QueuedCommand.class), any(boolean.class))).thenReturn(future);
- stream = new NettyClientStream(new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE),
- methodDescriptor, new Metadata(), channel, AsciiString.of("localhost"),
- AsciiString.of("http"), AsciiString.of("good agent"), StatsTraceContext.NOOP);
+ stream = new NettyClientStream(
+ new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE),
+ methodDescriptor,
+ new Metadata(),
+ channel,
+ AsciiString.of("localhost"),
+ AsciiString.of("http"),
+ AsciiString.of("good agent"),
+ StatsTraceContext.NOOP,
+ transportTracer);
stream.start(listener);
ArgumentCaptor<CreateStreamCommand> cmdCap = ArgumentCaptor.forClass(CreateStreamCommand.class);
@@ -430,9 +445,15 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
.setSafe(true)
.build();
NettyClientStream stream = new NettyClientStream(
- new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), descriptor, new Metadata(),
- channel, AsciiString.of("localhost"), AsciiString.of("http"), AsciiString.of("agent"),
- StatsTraceContext.NOOP);
+ new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE),
+ descriptor,
+ new Metadata(),
+ channel,
+ AsciiString.of("localhost"),
+ AsciiString.of("http"),
+ AsciiString.of("agent"),
+ StatsTraceContext.NOOP,
+ transportTracer);
stream.start(listener);
stream.transportState().setId(STREAM_ID);
stream.transportState().setHttp2Stream(http2Stream);
@@ -468,9 +489,15 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
}).when(writeQueue).enqueue(any(QueuedCommand.class), any(ChannelPromise.class), anyBoolean());
when(writeQueue.enqueue(any(QueuedCommand.class), anyBoolean())).thenReturn(future);
NettyClientStream stream = new NettyClientStream(
- new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE), methodDescriptor, new Metadata(),
- channel, AsciiString.of("localhost"), AsciiString.of("http"), AsciiString.of("agent"),
- StatsTraceContext.NOOP);
+ new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE),
+ methodDescriptor,
+ new Metadata(),
+ channel,
+ AsciiString.of("localhost"),
+ AsciiString.of("http"),
+ AsciiString.of("agent"),
+ StatsTraceContext.NOOP,
+ transportTracer);
stream.start(listener);
stream.transportState().setId(STREAM_ID);
stream.transportState().setHttp2Stream(http2Stream);
@@ -508,7 +535,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
private class TransportStateImpl extends NettyClientStream.TransportState {
public TransportStateImpl(NettyClientHandler handler, int maxMessageSize) {
- super(handler, channel.eventLoop(), maxMessageSize, StatsTraceContext.NOOP);
+ super(handler, channel.eventLoop(), maxMessageSize, StatsTraceContext.NOOP, transportTracer);
}
@Override
diff --git a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
index 5bf5cbe40..4556eb566 100644
--- a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
+++ b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
@@ -17,6 +17,7 @@
package io.grpc.netty;
import static com.google.common.base.Charsets.UTF_8;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertEquals;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Matchers.any;
@@ -48,6 +49,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
+import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Exception;
@@ -97,6 +99,9 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
*/
protected void manualSetUp() throws Exception {}
+ protected final TransportTracer transportTracer = new TransportTracer();
+ protected int flowControlWindow = DEFAULT_WINDOW_SIZE;
+
private final FakeClock fakeClock = new FakeClock();
FakeClock fakeClock() {
@@ -451,4 +456,53 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
assertEquals(maxWindow, localFlowController.initialWindowSize(connectionStream));
}
+ @Test
+ public void transportTracer_windowSizeDefault() throws Exception {
+ manualSetUp();
+ TransportTracer.Stats stats = transportTracer.getStats();
+ assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
+ assertEquals(flowControlWindow, stats.localFlowControlWindow);
+ }
+
+ @Test
+ public void transportTracer_windowSize() throws Exception {
+ flowControlWindow = 1024 * 1024;
+ manualSetUp();
+ TransportTracer.Stats stats = transportTracer.getStats();
+ assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
+ assertEquals(flowControlWindow, stats.localFlowControlWindow);
+ }
+
+ @Test
+ public void transportTracer_windowUpdate_remote() throws Exception {
+ manualSetUp();
+ TransportTracer.Stats before = transportTracer.getStats();
+ assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
+ assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.localFlowControlWindow);
+
+ ByteBuf serializedSettings = windowUpdate(0, 1000);
+ channelRead(serializedSettings);
+ TransportTracer.Stats after = transportTracer.getStats();
+ assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE + 1000,
+ after.remoteFlowControlWindow);
+ assertEquals(flowControlWindow, after.localFlowControlWindow);
+ }
+
+ @Test
+ public void transportTracer_windowUpdate_local() throws Exception {
+ manualSetUp();
+ TransportTracer.Stats before = transportTracer.getStats();
+ assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
+ assertEquals(flowControlWindow, before.localFlowControlWindow);
+
+ // If the window size is below a certain threshold, netty will wait to apply the update.
+ // Use a large increment to be sure that it exceeds the threshold.
+ connection().local().flowController().incrementWindowSize(
+ connection().connectionStream(), 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE);
+
+ TransportTracer.Stats after = transportTracer.getStats();
+ assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, after.remoteFlowControlWindow);
+ assertEquals(flowControlWindow + 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE,
+ connection().local().flowController().windowSize(connection().connectionStream()));
+ }
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
index cd8ee55e6..acf2710db 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
@@ -29,7 +29,6 @@ import static io.grpc.netty.Utils.HTTP_METHOD;
import static io.grpc.netty.Utils.TE_HEADER;
import static io.grpc.netty.Utils.TE_TRAILERS;
import static io.netty.buffer.Unpooled.directBuffer;
-import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -66,7 +65,6 @@ import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.StreamListener;
-import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
import io.netty.buffer.ByteBuf;
@@ -126,7 +124,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
final Queue<InputStream> streamListenerMessageQueue = new LinkedList<InputStream>();
- private int flowControlWindow = DEFAULT_WINDOW_SIZE;
private int maxConcurrentStreams = Integer.MAX_VALUE;
private int maxHeaderListSize = Integer.MAX_VALUE;
private boolean permitKeepAliveWithoutCalls = true;
@@ -136,7 +133,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
private long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
- private TransportTracer transportTracer = new TransportTracer();
private class ServerTransportListenerImpl implements ServerTransportListener {
@@ -766,48 +762,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
assertTrue(!channel().isOpen());
}
- @Test
- public void transportTracer_windowSizeDefault() throws Exception {
- manualSetUp();
- TransportTracer.Stats stats = transportTracer.getStats();
- assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
- assertEquals(flowControlWindow, stats.localFlowControlWindow);
- }
-
- @Test
- public void transportTracer_windowUpdate_remote() throws Exception {
- flowControlWindow = 1048576; // 1MiB
- manualSetUp();
- TransportTracer.Stats before = transportTracer.getStats();
- assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
- assertEquals(flowControlWindow, before.localFlowControlWindow);
-
- ByteBuf serializedSettings = windowUpdate(0, 1000);
- channelRead(serializedSettings);
- TransportTracer.Stats after = transportTracer.getStats();
- assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE + 1000,
- after.remoteFlowControlWindow);
- assertEquals(flowControlWindow, after.localFlowControlWindow);
- }
-
- @Test
- public void transportTracer_windowUpdate_local() throws Exception {
- manualSetUp();
- TransportTracer.Stats before = transportTracer.getStats();
- assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow);
- assertEquals(flowControlWindow, before.localFlowControlWindow);
-
- // If the window size is below a certain threshold, netty will wait to apply the update.
- // Use a large increment to be sure that it exceeds the threshold.
- connection().local().flowController().incrementWindowSize(
- connection().connectionStream(), 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE);
-
- TransportTracer.Stats after = transportTracer.getStats();
- assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, after.remoteFlowControlWindow);
- assertEquals(flowControlWindow + 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE,
- after.localFlowControlWindow);
- }
-
private void createStream() throws Exception {
Http2Headers headers = new DefaultHttp2Headers()
.method(HTTP_METHOD)