diff options
author | Eric Anderson <ejona@google.com> | 2018-07-03 16:33:13 -0700 |
---|---|---|
committer | Eric Anderson <ejona@google.com> | 2018-07-09 13:00:17 -0700 |
commit | b92ea686470606b054317c5e4a288882820b4398 (patch) | |
tree | 95abb22e8c8eeb6428db4ca9b49442caffdf89fb /netty | |
parent | 9d6241eedce849493b86320a995fd27775f0bd40 (diff) | |
download | grpc-grpc-java-b92ea686470606b054317c5e4a288882820b4398.tar.gz |
netty: Propagate EAG attributes to ProtocolNegotiator
This lets the NameResolver/LB coordinate with the negotiator, like is
necessary with ALTS on GCP.
Diffstat (limited to 'netty')
6 files changed, 54 insertions, 12 deletions
diff --git a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java index 771bcd249..4640693f2 100644 --- a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java +++ b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java @@ -77,4 +77,9 @@ public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler public void notifyUnused() { channelUnused.setSuccess(null); } + + /** Get the attributes of the EquivalentAddressGroup used to create this transport. */ + public Attributes getEagAttributes() { + return Attributes.EMPTY; + } } diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 3ff59ab15..a0d4525e0 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -530,7 +530,7 @@ public final class NettyChannelBuilder dparams.getProtocolNegotiator(), flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(), - tooManyPingsRunnable, transportTracer); + tooManyPingsRunnable, transportTracer, options.getEagAttributes()); return transport; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 07ce8e94c..5c216d94e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -102,6 +102,7 @@ class NettyClientHandler extends AbstractNettyHandler { // Returns new unstarted stopwatches private final Supplier<Stopwatch> stopwatchFactory; private final TransportTracer transportTracer; + private final Attributes eagAttributes; private WriteQueue clientWriteQueue; private Http2Ping ping; private Attributes attributes = Attributes.EMPTY; @@ -114,7 +115,8 @@ class NettyClientHandler extends AbstractNettyHandler { int maxHeaderListSize, Supplier<Stopwatch> stopwatchFactory, Runnable tooManyPingsRunnable, - TransportTracer transportTracer) { + TransportTracer transportTracer, + Attributes eagAttributes) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); @@ -136,7 +138,8 @@ class NettyClientHandler extends AbstractNettyHandler { maxHeaderListSize, stopwatchFactory, tooManyPingsRunnable, - transportTracer); + transportTracer, + eagAttributes); } @VisibleForTesting @@ -150,7 +153,8 @@ class NettyClientHandler extends AbstractNettyHandler { int maxHeaderListSize, Supplier<Stopwatch> stopwatchFactory, Runnable tooManyPingsRunnable, - TransportTracer transportTracer) { + TransportTracer transportTracer, + Attributes eagAttributes) { Preconditions.checkNotNull(connection, "connection"); Preconditions.checkNotNull(frameReader, "frameReader"); Preconditions.checkNotNull(lifecycleManager, "lifecycleManager"); @@ -158,6 +162,7 @@ class NettyClientHandler extends AbstractNettyHandler { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory"); Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); + Preconditions.checkNotNull(eagAttributes, "eagAttributes"); Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class); frameReader = new Http2InboundFrameLogger(frameReader, frameLogger); @@ -199,7 +204,8 @@ class NettyClientHandler extends AbstractNettyHandler { keepAliveManager, stopwatchFactory, tooManyPingsRunnable, - transportTracer); + transportTracer, + eagAttributes); } private NettyClientHandler( @@ -210,12 +216,14 @@ class NettyClientHandler extends AbstractNettyHandler { KeepAliveManager keepAliveManager, Supplier<Stopwatch> stopwatchFactory, final Runnable tooManyPingsRunnable, - TransportTracer transportTracer) { + TransportTracer transportTracer, + Attributes eagAttributes) { super(/* channelUnused= */ null, decoder, encoder, settings); this.lifecycleManager = lifecycleManager; this.keepAliveManager = keepAliveManager; this.stopwatchFactory = stopwatchFactory; this.transportTracer = Preconditions.checkNotNull(transportTracer); + this.eagAttributes = eagAttributes; // Set the frame listener on the decoder. decoder().frameListener(new FrameListener()); @@ -416,6 +424,11 @@ class NettyClientHandler extends AbstractNettyHandler { super.handleProtocolNegotiationCompleted(attributes, securityInfo); } + @Override + public Attributes getEagAttributes() { + return eagAttributes; + } + Channelz.Security getSecurityInfo() { return securityInfo; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 7fbc1607d..894cb5ea9 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -91,6 +91,7 @@ class NettyClientTransport implements ConnectionClientTransport { private ClientTransportLifecycleManager lifecycleManager; /** Since not thread-safe, may only be used from event loop. */ private final TransportTracer transportTracer; + private final Attributes eagAttributes; NettyClientTransport( SocketAddress address, Class<? extends Channel> channelType, @@ -98,7 +99,7 @@ class NettyClientTransport implements ConnectionClientTransport { ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, - Runnable tooManyPingsRunnable, TransportTracer transportTracer) { + Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes) { this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); this.address = Preconditions.checkNotNull(address, "address"); this.group = Preconditions.checkNotNull(group, "group"); @@ -115,6 +116,7 @@ class NettyClientTransport implements ConnectionClientTransport { this.tooManyPingsRunnable = Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); + this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes"); } @Override @@ -194,7 +196,8 @@ class NettyClientTransport implements ConnectionClientTransport { maxHeaderListSize, GrpcUtil.STOPWATCH_SUPPLIER, tooManyPingsRunnable, - transportTracer); + transportTracer, + eagAttributes); NettyHandlerSettings.setAutoWindow(handler); negotiationHandler = negotiator.newHandler(handler); diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 2749cb4d8..8b6540bff 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -52,6 +52,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.MoreExecutors; import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.grpc.Attributes; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusException; @@ -720,7 +721,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand maxHeaderListSize, stopwatchSupplier, tooManyPingsRunnable, - transportTracer); + transportTracer, + Attributes.EMPTY); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 56320bbe2..d8d891e52 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -112,6 +112,7 @@ public class NettyClientTransportTest { // Throwing is useless in this method, because Netty doesn't propagate the exception @Override public void run() {} }; + private Attributes eagAttributes = Attributes.EMPTY; private ProtocolNegotiator negotiator = ProtocolNegotiators.serverTls(SSL_CONTEXT); @@ -174,7 +175,7 @@ public class NettyClientTransportTest { address, NioSocketChannel.class, channelOptions, group, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, - tooManyPingsRunnable, new TransportTracer()); + tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY); transports.add(transport); callMeMaybe(transport.start(clientTransportListener)); @@ -414,7 +415,7 @@ public class NettyClientTransportTest { address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, - null, tooManyPingsRunnable, new TransportTracer()); + null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY); transports.add(transport); // Should not throw @@ -510,6 +511,22 @@ public class NettyClientTransportTest { } @Test + public void getEagAttributes_negotiatorHandler() throws Exception { + address = TestUtils.testServerAddress(12345); + authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); + + NoopProtocolNegotiator npn = new NoopProtocolNegotiator(); + eagAttributes = Attributes.newBuilder() + .set(Attributes.Key.create("trash"), "value") + .build(); + NettyClientTransport transport = newTransport(npn); + callMeMaybe(transport.start(clientTransportListener)); + + // EAG Attributes are available before the negotiation is complete + assertSame(eagAttributes, npn.grpcHandler.getEagAttributes()); + } + + @Test public void clientStreamGetsAttributes() throws Exception { startServer(); NettyClientTransport transport = newTransport(newNegotiator()); @@ -576,7 +593,7 @@ public class NettyClientTransportTest { DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, keepAliveTimeNano, keepAliveTimeoutNano, false, authority, userAgent, tooManyPingsRunnable, - new TransportTracer()); + new TransportTracer(), eagAttributes); transports.add(transport); return transport; } @@ -796,10 +813,12 @@ public class NettyClientTransportTest { } private static class NoopProtocolNegotiator implements ProtocolNegotiator { + GrpcHttp2ConnectionHandler grpcHandler; NoopHandler handler; @Override public Handler newHandler(final GrpcHttp2ConnectionHandler grpcHandler) { + this.grpcHandler = grpcHandler; return handler = new NoopHandler(grpcHandler); } } |