aboutsummaryrefslogtreecommitdiff
path: root/netty
diff options
context:
space:
mode:
authorzpencer <spencerfang@google.com>2017-11-02 15:44:21 -0700
committerGitHub <noreply@github.com>2017-11-02 15:44:21 -0700
commit2162cd07d5debe833bfdfe3512f30ed085140b91 (patch)
tree231d858d2eda19e3d21cf0e4c2dd18013e73e8d2 /netty
parent4c96ebd6d410923d78ab110c36bf8b3b1402126c (diff)
downloadgrpc-grpc-java-2162cd07d5debe833bfdfe3512f30ed085140b91.tar.gz
netty,core: add a TransportTracer class (#3454)
Diffstat (limited to 'netty')
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServerHandler.java81
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServerStream.java34
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServerTransport.java26
-rw-r--r--netty/src/main/java/io/grpc/netty/SendResponseHeadersCommand.java30
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java30
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java38
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java7
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyTransportTest.java5
8 files changed, 209 insertions, 42 deletions
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
index f85e6b702..79b102362 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
@@ -40,6 +40,7 @@ import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.LogExceptionRunnable;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.TransportTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
@@ -62,6 +63,7 @@ import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FlowController;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
@@ -102,6 +104,7 @@ class NettyServerHandler extends AbstractNettyHandler {
private final long maxConnectionAgeInNanos;
private final long maxConnectionAgeGraceInNanos;
private final List<ServerStreamTracer.Factory> streamTracerFactories;
+ private final TransportTracer transportTracer;
private final KeepAliveEnforcer keepAliveEnforcer;
private Attributes attributes;
private Throwable connectionError;
@@ -118,6 +121,7 @@ class NettyServerHandler extends AbstractNettyHandler {
static NettyServerHandler newHandler(
ServerTransportListener transportListener,
List<ServerStreamTracer.Factory> streamTracerFactories,
+ TransportTracer transportTracer,
int maxStreams,
int flowControlWindow,
int maxHeaderListSize,
@@ -137,12 +141,22 @@ class NettyServerHandler extends AbstractNettyHandler {
Http2FrameWriter frameWriter =
new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
return newHandler(
- frameReader, frameWriter, transportListener, streamTracerFactories,
- maxStreams, flowControlWindow, maxHeaderListSize, maxMessageSize,
- keepAliveTimeInNanos, keepAliveTimeoutInNanos,
+ frameReader,
+ frameWriter,
+ transportListener,
+ streamTracerFactories,
+ transportTracer,
+ maxStreams,
+ flowControlWindow,
+ maxHeaderListSize,
+ maxMessageSize,
+ keepAliveTimeInNanos,
+ keepAliveTimeoutInNanos,
maxConnectionIdleInNanos,
- maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
- permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
+ maxConnectionAgeInNanos,
+ maxConnectionAgeGraceInNanos,
+ permitKeepAliveWithoutCalls,
+ permitKeepAliveTimeInNanos);
}
@VisibleForTesting
@@ -150,6 +164,7 @@ class NettyServerHandler extends AbstractNettyHandler {
Http2FrameReader frameReader, Http2FrameWriter frameWriter,
ServerTransportListener transportListener,
List<ServerStreamTracer.Factory> streamTracerFactories,
+ TransportTracer transportTracer,
int maxStreams,
int flowControlWindow,
int maxHeaderListSize,
@@ -178,7 +193,6 @@ class NettyServerHandler extends AbstractNettyHandler {
// Create the local flow controller configured to auto-refill the connection window.
connection.local().flowController(
new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
-
frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
@@ -193,6 +207,7 @@ class NettyServerHandler extends AbstractNettyHandler {
connection,
transportListener,
streamTracerFactories,
+ transportTracer,
decoder, encoder, settings,
maxMessageSize,
keepAliveTimeInNanos, keepAliveTimeoutInNanos,
@@ -205,8 +220,10 @@ class NettyServerHandler extends AbstractNettyHandler {
final Http2Connection connection,
ServerTransportListener transportListener,
List<ServerStreamTracer.Factory> streamTracerFactories,
+ TransportTracer transportTracer,
Http2ConnectionDecoder decoder,
- Http2ConnectionEncoder encoder, Http2Settings settings,
+ Http2ConnectionEncoder encoder,
+ Http2Settings settings,
int maxMessageSize,
long keepAliveTimeInNanos,
long keepAliveTimeoutInNanos,
@@ -273,6 +290,7 @@ class NettyServerHandler extends AbstractNettyHandler {
streamKey = encoder.connection().newKey();
this.transportListener = checkNotNull(transportListener, "transportListener");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
+ this.transportTracer = checkNotNull(transportTracer, "transportTracer");
// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
@@ -328,6 +346,25 @@ class NettyServerHandler extends AbstractNettyHandler {
keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
keepAliveManager.onTransportStarted();
}
+
+
+ if (transportTracer != null) {
+ assert encoder().connection().equals(decoder().connection());
+ final Http2Connection connection = encoder().connection();
+ transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
+ private final Http2FlowController local = connection.local().flowController();
+ private final Http2FlowController remote = connection.remote().flowController();
+
+ @Override
+ public TransportTracer.FlowControlWindows read() {
+ assert ctx.executor().inEventLoop();
+ return new TransportTracer.FlowControlWindows(
+ local.windowSize(connection.connectionStream()),
+ remote.windowSize(connection.connectionStream()));
+ }
+ });
+ }
+
super.handlerAdded(ctx);
}
@@ -354,10 +391,20 @@ class NettyServerHandler extends AbstractNettyHandler {
StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
NettyServerStream.TransportState state = new NettyServerStream.TransportState(
- this, ctx.channel().eventLoop(), http2Stream, maxMessageSize, statsTraceCtx);
+ this,
+ ctx.channel().eventLoop(),
+ http2Stream,
+ maxMessageSize,
+ statsTraceCtx,
+ transportTracer);
String authority = getOrUpdateAuthority((AsciiString)headers.authority());
- NettyServerStream stream = new NettyServerStream(ctx.channel(), state, attributes,
- authority, statsTraceCtx);
+ NettyServerStream stream = new NettyServerStream(
+ ctx.channel(),
+ state,
+ attributes,
+ authority,
+ statsTraceCtx,
+ transportTracer);
transportListener.streamCreated(stream, method, metadata);
state.onStreamAllocated();
http2Stream.setProperty(streamKey, state);
@@ -555,6 +602,7 @@ class NettyServerHandler extends AbstractNettyHandler {
}
if (cmd.endOfStream()) {
closeStreamWhenDone(promise, streamId);
+ transportTracer.reportStreamClosed(cmd.status());
}
encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
}
@@ -712,8 +760,19 @@ class NettyServerHandler extends AbstractNettyHandler {
@Override
public void ping() {
- encoder().writePing(ctx, false /* isAck */, KEEPALIVE_PING_BUF, ctx.newPromise());
+ ChannelFuture pingFuture = encoder().writePing(
+ ctx, false /* isAck */, KEEPALIVE_PING_BUF, ctx.newPromise());
ctx.flush();
+ if (transportTracer != null) {
+ pingFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ transportTracer.reportKeepAliveSent();
+ }
+ }
+ });
+ }
}
@Override
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java
index 869a38d42..37c91a50a 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java
@@ -23,6 +23,7 @@ import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.AbstractServerStream;
import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
@@ -48,9 +49,14 @@ class NettyServerStream extends AbstractServerStream {
private final Attributes attributes;
private final String authority;
- public NettyServerStream(Channel channel, TransportState state, Attributes transportAttrs,
- String authority, StatsTraceContext statsTraceCtx) {
- super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx);
+ public NettyServerStream(
+ Channel channel,
+ TransportState state,
+ Attributes transportAttrs,
+ String authority,
+ StatsTraceContext statsTraceCtx,
+ TransportTracer transportTracer) {
+ super(new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx, transportTracer);
this.state = checkNotNull(state, "transportState");
this.channel = checkNotNull(channel, "channel");
this.writeQueue = state.handler.getWriteQueue();
@@ -96,8 +102,10 @@ class NettyServerStream extends AbstractServerStream {
@Override
public void writeHeaders(Metadata headers) {
- writeQueue.enqueue(new SendResponseHeadersCommand(transportState(),
- Utils.convertServerHeaders(headers), false),
+ writeQueue.enqueue(
+ SendResponseHeadersCommand.createHeaders(
+ transportState(),
+ Utils.convertServerHeaders(headers)),
true);
}
@@ -124,10 +132,11 @@ class NettyServerStream extends AbstractServerStream {
}
@Override
- public void writeTrailers(Metadata trailers, boolean headersSent) {
+ public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent);
writeQueue.enqueue(
- new SendResponseHeadersCommand(transportState(), http2Trailers, true), true);
+ SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status),
+ true);
}
@Override
@@ -143,9 +152,14 @@ class NettyServerStream extends AbstractServerStream {
private final NettyServerHandler handler;
private final EventLoop eventLoop;
- public TransportState(NettyServerHandler handler, EventLoop eventLoop, Http2Stream http2Stream,
- int maxMessageSize, StatsTraceContext statsTraceCtx) {
- super(maxMessageSize, statsTraceCtx);
+ public TransportState(
+ NettyServerHandler handler,
+ EventLoop eventLoop,
+ Http2Stream http2Stream,
+ int maxMessageSize,
+ StatsTraceContext statsTraceCtx,
+ TransportTracer transportTracer) {
+ super(maxMessageSize, statsTraceCtx, transportTracer);
this.http2Stream = checkNotNull(http2Stream, "http2Stream");
this.handler = checkNotNull(handler, "handler");
this.eventLoop = eventLoop;
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
index afb124c82..0e70349bc 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
@@ -19,17 +19,21 @@ package io.grpc.netty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.internal.LogId;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
+import io.grpc.internal.TransportTracer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -65,6 +69,7 @@ class NettyServerTransport implements ServerTransport {
private final boolean permitKeepAliveWithoutCalls;
private final long permitKeepAliveTimeInNanos;
private final List<ServerStreamTracer.Factory> streamTracerFactories;
+ private final TransportTracer transportTracer;
NettyServerTransport(
Channel channel, ProtocolNegotiator protocolNegotiator,
@@ -89,6 +94,7 @@ class NettyServerTransport implements ServerTransport {
this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
+ this.transportTracer = new TransportTracer();
}
public void start(ServerTransportListener listener) {
@@ -168,12 +174,30 @@ class NettyServerTransport implements ServerTransport {
}
}
+ @Override
+ public Future<TransportTracer.Stats> getTransportStats() {
+ if (channel.eventLoop().inEventLoop()) {
+ // This is necessary, otherwise we will block forever if we get the future from inside
+ // the event loop.
+ SettableFuture<TransportTracer.Stats> result = SettableFuture.create();
+ result.set(transportTracer.getStats());
+ return result;
+ }
+ return channel.eventLoop().submit(
+ new Callable<TransportTracer.Stats>() {
+ @Override
+ public TransportTracer.Stats call() throws Exception {
+ return transportTracer.getStats();
+ }
+ });
+ }
+
/**
* Creates the Netty handler to be used in the channel pipeline.
*/
private NettyServerHandler createHandler(ServerTransportListener transportListener) {
return NettyServerHandler.newHandler(
- transportListener, streamTracerFactories, maxStreams,
+ transportListener, streamTracerFactories, transportTracer, maxStreams,
flowControlWindow, maxHeaderListSize, maxMessageSize,
keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos,
diff --git a/netty/src/main/java/io/grpc/netty/SendResponseHeadersCommand.java b/netty/src/main/java/io/grpc/netty/SendResponseHeadersCommand.java
index f969e0390..38536d693 100644
--- a/netty/src/main/java/io/grpc/netty/SendResponseHeadersCommand.java
+++ b/netty/src/main/java/io/grpc/netty/SendResponseHeadersCommand.java
@@ -16,7 +16,9 @@
package io.grpc.netty;
+import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import io.grpc.Status;
import io.netty.handler.codec.http2.Http2Headers;
/**
@@ -25,12 +27,22 @@ import io.netty.handler.codec.http2.Http2Headers;
class SendResponseHeadersCommand extends WriteQueue.AbstractQueuedCommand {
private final StreamIdHolder stream;
private final Http2Headers headers;
- private final boolean endOfStream;
+ private final Status status;
- SendResponseHeadersCommand(StreamIdHolder stream, Http2Headers headers, boolean endOfStream) {
+ private SendResponseHeadersCommand(StreamIdHolder stream, Http2Headers headers, Status status) {
this.stream = Preconditions.checkNotNull(stream, "stream");
this.headers = Preconditions.checkNotNull(headers, "headers");
- this.endOfStream = endOfStream;
+ this.status = status;
+ }
+
+ static SendResponseHeadersCommand createHeaders(StreamIdHolder stream, Http2Headers headers) {
+ return new SendResponseHeadersCommand(stream, headers, null);
+ }
+
+ static SendResponseHeadersCommand createTrailers(
+ StreamIdHolder stream, Http2Headers headers, Status status) {
+ return new SendResponseHeadersCommand(
+ stream, headers, Preconditions.checkNotNull(status, "status"));
}
StreamIdHolder stream() {
@@ -42,7 +54,11 @@ class SendResponseHeadersCommand extends WriteQueue.AbstractQueuedCommand {
}
boolean endOfStream() {
- return endOfStream;
+ return status != null;
+ }
+
+ Status status() {
+ return status;
}
@Override
@@ -53,17 +69,17 @@ class SendResponseHeadersCommand extends WriteQueue.AbstractQueuedCommand {
SendResponseHeadersCommand thatCmd = (SendResponseHeadersCommand) that;
return thatCmd.stream.equals(stream)
&& thatCmd.headers.equals(headers)
- && thatCmd.endOfStream == endOfStream;
+ && thatCmd.status.equals(status);
}
@Override
public String toString() {
return getClass().getSimpleName() + "(stream=" + stream.id() + ", headers=" + headers
- + ", endOfStream=" + endOfStream + ")";
+ + ", status=" + status + ")";
}
@Override
public int hashCode() {
- return stream.hashCode();
+ return Objects.hashCode(stream, status);
}
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
index 98c166144..d0bbd993c 100644
--- a/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
+++ b/netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
@@ -32,6 +32,7 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.internal.FakeClock;
import io.grpc.internal.MessageFramer;
import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -239,15 +240,20 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
protected ByteBuf grpcDataFrame(int streamId, boolean endStream, byte[] content) {
final ByteBuf compressionFrame = Unpooled.buffer(content.length);
- MessageFramer framer = new MessageFramer(new MessageFramer.Sink() {
- @Override
- public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
- if (frame != null) {
- ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf();
- compressionFrame.writeBytes(bytebuf);
- }
- }
- }, new NettyWritableBufferAllocator(ByteBufAllocator.DEFAULT), StatsTraceContext.NOOP);
+ TransportTracer noTransportTracer = null;
+ MessageFramer framer = new MessageFramer(
+ new MessageFramer.Sink() {
+ @Override
+ public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
+ if (frame != null) {
+ ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf();
+ compressionFrame.writeBytes(bytebuf);
+ }
+ }
+ },
+ new NettyWritableBufferAllocator(ByteBufAllocator.DEFAULT),
+ StatsTraceContext.NOOP,
+ noTransportTracer);
framer.writePayload(new ByteArrayInputStream(content));
framer.flush();
ChannelHandlerContext ctx = newMockContext();
@@ -302,6 +308,12 @@ public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {
return captureWrite(ctx);
}
+ protected final ByteBuf windowUpdate(int streamId, int delta) {
+ ChannelHandlerContext ctx = newMockContext();
+ new DefaultHttp2FrameWriter().writeWindowUpdate(ctx, 0, delta, newPromise());
+ return captureWrite(ctx);
+ }
+
protected final ChannelPromise newPromise() {
return channel.newPromise();
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
index 8854961de..008ba23ef 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
@@ -66,6 +66,7 @@ import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.StreamListener;
+import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
import io.netty.buffer.ByteBuf;
@@ -135,6 +136,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
private long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
private long keepAliveTimeInNanos = DEFAULT_SERVER_KEEPALIVE_TIME_NANOS;
private long keepAliveTimeoutInNanos = DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS;
+ private TransportTracer transportTracer;
private class ServerTransportListenerImpl implements ServerTransportListener {
@@ -158,6 +160,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
assertNull("manualSetUp should not run more than once", handler());
MockitoAnnotations.initMocks(this);
+ transportTracer = new TransportTracer();
when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class)))
.thenReturn(streamTracer);
@@ -495,7 +498,9 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
keepAliveTimeoutInNanos = TimeUnit.MINUTES.toNanos(30L);
manualSetUp();
+ assertEquals(0, transportTracer.getStats().keepAlivesSent);
fakeClock().forwardNanos(keepAliveTimeInNanos);
+ assertEquals(1, transportTracer.getStats().keepAlivesSent);
verifyWrite().writePing(eq(ctx()), eq(false), eq(pingBuf), any(ChannelPromise.class));
@@ -548,7 +553,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
createStream();
Http2Headers headers = Utils.convertServerHeaders(new Metadata());
ChannelFuture future = enqueue(
- new SendResponseHeadersCommand(stream.transportState(), headers, false));
+ SendResponseHeadersCommand.createHeaders(stream.transportState(), headers));
future.get();
ByteBuf payload = handler().ctx().alloc().buffer(8);
payload.writeLong(1);
@@ -745,6 +750,34 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
assertTrue(!channel().isOpen());
}
+ @Test
+ public void transportTracer_windowSizeDefault() throws Exception {
+ manualSetUp();
+ TransportTracer.Stats stats = transportTracer.getStats();
+ assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
+ assertEquals(flowControlWindow, stats.localFlowControlWindow);
+ }
+
+ @Test
+ public void transportTracer_windowSize() throws Exception {
+ flowControlWindow = 1048576; // 1MiB
+ manualSetUp();
+ {
+ TransportTracer.Stats stats = transportTracer.getStats();
+ assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow);
+ assertEquals(flowControlWindow, stats.localFlowControlWindow);
+ }
+
+ {
+ ByteBuf serializedSettings = windowUpdate(0, 1000);
+ channelRead(serializedSettings);
+ TransportTracer.Stats stats = transportTracer.getStats();
+ assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE + 1000,
+ stats.remoteFlowControlWindow);
+ assertEquals(flowControlWindow, stats.localFlowControlWindow);
+ }
+ }
+
private void createStream() throws Exception {
Http2Headers headers = new DefaultHttp2Headers()
.method(HTTP_METHOD)
@@ -775,7 +808,8 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
protected NettyServerHandler newHandler() {
return NettyServerHandler.newHandler(
frameReader(), frameWriter(), transportListener,
- Arrays.asList(streamTracerFactory), maxConcurrentStreams, flowControlWindow,
+ Arrays.asList(streamTracerFactory), transportTracer,
+ maxConcurrentStreams, flowControlWindow,
maxHeaderListSize, DEFAULT_MAX_MESSAGE_SIZE,
keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos,
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java
index de5ef8626..3818ec994 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java
@@ -42,6 +42,7 @@ import io.grpc.Status;
import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.StreamListener;
+import io.grpc.internal.TransportTracer;
import io.grpc.netty.WriteQueue.QueuedCommand;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
@@ -298,10 +299,12 @@ public class NettyServerStreamTest extends NettyStreamTestBase<NettyServerStream
}).when(writeQueue).enqueue(any(QueuedCommand.class), any(ChannelPromise.class), anyBoolean());
when(writeQueue.enqueue(any(QueuedCommand.class), anyBoolean())).thenReturn(future);
StatsTraceContext statsTraceCtx = StatsTraceContext.NOOP;
+ TransportTracer transportTracer = new TransportTracer();
NettyServerStream.TransportState state = new NettyServerStream.TransportState(
- handler, channel.eventLoop(), http2Stream, DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx);
+ handler, channel.eventLoop(), http2Stream, DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx,
+ transportTracer);
NettyServerStream stream = new NettyServerStream(channel, state, Attributes.EMPTY,
- "test-authority", statsTraceCtx);
+ "test-authority", statsTraceCtx, transportTracer);
stream.transportState().setListener(serverListener);
state.onStreamAllocated();
verify(serverListener, atLeastOnce()).onReady();
diff --git a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java
index 8d2a528a0..69afb11f1 100644
--- a/netty/src/test/java/io/grpc/netty/NettyTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyTransportTest.java
@@ -42,6 +42,11 @@ public class NettyTransportTest extends AbstractTransportTest {
.negotiationType(NegotiationType.PLAINTEXT)
.buildTransportFactory();
+ @Override
+ protected boolean haveTransportTracer() {
+ return true;
+ }
+
@After
public void releaseClientFactory() {
clientFactory.close();