aboutsummaryrefslogtreecommitdiff
path: root/netty
diff options
context:
space:
mode:
authorEric Anderson <ejona@google.com>2018-07-03 16:33:13 -0700
committerEric Anderson <ejona@google.com>2018-07-09 13:00:17 -0700
commitb92ea686470606b054317c5e4a288882820b4398 (patch)
tree95abb22e8c8eeb6428db4ca9b49442caffdf89fb /netty
parent9d6241eedce849493b86320a995fd27775f0bd40 (diff)
downloadgrpc-grpc-java-b92ea686470606b054317c5e4a288882820b4398.tar.gz
netty: Propagate EAG attributes to ProtocolNegotiator
This lets the NameResolver/LB coordinate with the negotiator, like is necessary with ALTS on GCP.
Diffstat (limited to 'netty')
-rw-r--r--netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java5
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java2
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientHandler.java23
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientTransport.java7
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java4
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java25
6 files changed, 54 insertions, 12 deletions
diff --git a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java
index 771bcd249..4640693f2 100644
--- a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java
+++ b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java
@@ -77,4 +77,9 @@ public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler
public void notifyUnused() {
channelUnused.setSuccess(null);
}
+
+ /** Get the attributes of the EquivalentAddressGroup used to create this transport. */
+ public Attributes getEagAttributes() {
+ return Attributes.EMPTY;
+ }
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index 3ff59ab15..a0d4525e0 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -530,7 +530,7 @@ public final class NettyChannelBuilder
dparams.getProtocolNegotiator(), flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(),
- tooManyPingsRunnable, transportTracer);
+ tooManyPingsRunnable, transportTracer, options.getEagAttributes());
return transport;
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index 07ce8e94c..5c216d94e 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -102,6 +102,7 @@ class NettyClientHandler extends AbstractNettyHandler {
// Returns new unstarted stopwatches
private final Supplier<Stopwatch> stopwatchFactory;
private final TransportTracer transportTracer;
+ private final Attributes eagAttributes;
private WriteQueue clientWriteQueue;
private Http2Ping ping;
private Attributes attributes = Attributes.EMPTY;
@@ -114,7 +115,8 @@ class NettyClientHandler extends AbstractNettyHandler {
int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory,
Runnable tooManyPingsRunnable,
- TransportTracer transportTracer) {
+ TransportTracer transportTracer,
+ Attributes eagAttributes) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
@@ -136,7 +138,8 @@ class NettyClientHandler extends AbstractNettyHandler {
maxHeaderListSize,
stopwatchFactory,
tooManyPingsRunnable,
- transportTracer);
+ transportTracer,
+ eagAttributes);
}
@VisibleForTesting
@@ -150,7 +153,8 @@ class NettyClientHandler extends AbstractNettyHandler {
int maxHeaderListSize,
Supplier<Stopwatch> stopwatchFactory,
Runnable tooManyPingsRunnable,
- TransportTracer transportTracer) {
+ TransportTracer transportTracer,
+ Attributes eagAttributes) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
@@ -158,6 +162,7 @@ class NettyClientHandler extends AbstractNettyHandler {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
+ Preconditions.checkNotNull(eagAttributes, "eagAttributes");
Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
@@ -199,7 +204,8 @@ class NettyClientHandler extends AbstractNettyHandler {
keepAliveManager,
stopwatchFactory,
tooManyPingsRunnable,
- transportTracer);
+ transportTracer,
+ eagAttributes);
}
private NettyClientHandler(
@@ -210,12 +216,14 @@ class NettyClientHandler extends AbstractNettyHandler {
KeepAliveManager keepAliveManager,
Supplier<Stopwatch> stopwatchFactory,
final Runnable tooManyPingsRunnable,
- TransportTracer transportTracer) {
+ TransportTracer transportTracer,
+ Attributes eagAttributes) {
super(/* channelUnused= */ null, decoder, encoder, settings);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory;
this.transportTracer = Preconditions.checkNotNull(transportTracer);
+ this.eagAttributes = eagAttributes;
// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
@@ -416,6 +424,11 @@ class NettyClientHandler extends AbstractNettyHandler {
super.handleProtocolNegotiationCompleted(attributes, securityInfo);
}
+ @Override
+ public Attributes getEagAttributes() {
+ return eagAttributes;
+ }
+
Channelz.Security getSecurityInfo() {
return securityInfo;
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index 7fbc1607d..894cb5ea9 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -91,6 +91,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private ClientTransportLifecycleManager lifecycleManager;
/** Since not thread-safe, may only be used from event loop. */
private final TransportTracer transportTracer;
+ private final Attributes eagAttributes;
NettyClientTransport(
SocketAddress address, Class<? extends Channel> channelType,
@@ -98,7 +99,7 @@ class NettyClientTransport implements ConnectionClientTransport {
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
- Runnable tooManyPingsRunnable, TransportTracer transportTracer) {
+ Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes) {
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
this.address = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
@@ -115,6 +116,7 @@ class NettyClientTransport implements ConnectionClientTransport {
this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
+ this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
}
@Override
@@ -194,7 +196,8 @@ class NettyClientTransport implements ConnectionClientTransport {
maxHeaderListSize,
GrpcUtil.STOPWATCH_SUPPLIER,
tooManyPingsRunnable,
- transportTracer);
+ transportTracer,
+ eagAttributes);
NettyHandlerSettings.setAutoWindow(handler);
negotiationHandler = negotiator.newHandler(handler);
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
index 2749cb4d8..8b6540bff 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
@@ -52,6 +52,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import io.grpc.Attributes;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
@@ -720,7 +721,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
maxHeaderListSize,
stopwatchSupplier,
tooManyPingsRunnable,
- transportTracer);
+ transportTracer,
+ Attributes.EMPTY);
}
@Override
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 56320bbe2..d8d891e52 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -112,6 +112,7 @@ public class NettyClientTransportTest {
// Throwing is useless in this method, because Netty doesn't propagate the exception
@Override public void run() {}
};
+ private Attributes eagAttributes = Attributes.EMPTY;
private ProtocolNegotiator negotiator = ProtocolNegotiators.serverTls(SSL_CONTEXT);
@@ -174,7 +175,7 @@ public class NettyClientTransportTest {
address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */,
- tooManyPingsRunnable, new TransportTracer());
+ tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY);
transports.add(transport);
callMeMaybe(transport.start(clientTransportListener));
@@ -414,7 +415,7 @@ public class NettyClientTransportTest {
address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
- null, tooManyPingsRunnable, new TransportTracer());
+ null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY);
transports.add(transport);
// Should not throw
@@ -510,6 +511,22 @@ public class NettyClientTransportTest {
}
@Test
+ public void getEagAttributes_negotiatorHandler() throws Exception {
+ address = TestUtils.testServerAddress(12345);
+ authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
+
+ NoopProtocolNegotiator npn = new NoopProtocolNegotiator();
+ eagAttributes = Attributes.newBuilder()
+ .set(Attributes.Key.create("trash"), "value")
+ .build();
+ NettyClientTransport transport = newTransport(npn);
+ callMeMaybe(transport.start(clientTransportListener));
+
+ // EAG Attributes are available before the negotiation is complete
+ assertSame(eagAttributes, npn.grpcHandler.getEagAttributes());
+ }
+
+ @Test
public void clientStreamGetsAttributes() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator());
@@ -576,7 +593,7 @@ public class NettyClientTransportTest {
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable,
- new TransportTracer());
+ new TransportTracer(), eagAttributes);
transports.add(transport);
return transport;
}
@@ -796,10 +813,12 @@ public class NettyClientTransportTest {
}
private static class NoopProtocolNegotiator implements ProtocolNegotiator {
+ GrpcHttp2ConnectionHandler grpcHandler;
NoopHandler handler;
@Override
public Handler newHandler(final GrpcHttp2ConnectionHandler grpcHandler) {
+ this.grpcHandler = grpcHandler;
return handler = new NoopHandler(grpcHandler);
}
}