diff options
author | android-build-team Robot <android-build-team-robot@google.com> | 2021-01-13 23:59:13 +0000 |
---|---|---|
committer | android-build-team Robot <android-build-team-robot@google.com> | 2021-01-13 23:59:13 +0000 |
commit | 7e0e0c30cfa46eeb5793ba46f1a273a8081b2100 (patch) | |
tree | 71cca5090003ca0b46a97cbc43b52e94103376bd /netty | |
parent | 9b4f1a6db99b7cb7d46320e25e32317853b500dc (diff) | |
parent | 599cbd6b784405f6a751b91d3bcc9536e4776e69 (diff) | |
download | grpc-grpc-java-7e0e0c30cfa46eeb5793ba46f1a273a8081b2100.tar.gz |
Snap for 7080740 from 599cbd6b784405f6a751b91d3bcc9536e4776e69 to mainline-tethering-releaseandroid-mainline-11.0.0_r43android-mainline-11.0.0_r24android11-mainline-tethering-release
Change-Id: Ifa5b02fa99dd9e1d614f8e97c85c132d3b81d948
Diffstat (limited to 'netty')
-rw-r--r-- | netty/shaded/Android.bp | 15 | ||||
-rw-r--r-- | netty/shaded/grpc-netty-shaded-1.14.0.jar | bin | 6401211 -> 0 bytes | |||
-rw-r--r-- | netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java | 29 | ||||
-rw-r--r-- | netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java | 184 | ||||
-rw-r--r-- | netty/src/main/java/io/grpc/netty/NettyClientTransport.java | 20 | ||||
-rw-r--r-- | netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java | 7 | ||||
-rw-r--r-- | netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java | 35 | ||||
-rw-r--r-- | netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java | 32 |
8 files changed, 174 insertions, 148 deletions
diff --git a/netty/shaded/Android.bp b/netty/shaded/Android.bp index 346371f1e..5aecf42ba 100644 --- a/netty/shaded/Android.bp +++ b/netty/shaded/Android.bp @@ -1,5 +1,5 @@ // -// Copyright (C) 2018 The Android Open Source Project +// Copyright (C) 2020 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,10 +13,15 @@ // See the License for the specific language governing permissions and // limitations under the License. // -// TODO: Build from source instead -java_import_host { + +// The below module imports the Netty shaded binary from the Maven repository +// since it's not trivial to build from source due to the large number of +// external dependencies and missing build-system support. +// +// WARNING: The artifact version must match the source to avoid runtime issues. +java_library_host { name: "grpc-java-netty-shaded", - jars: [ - "grpc-netty-shaded-1.14.0.jar", + static_libs: [ + "grpc-netty-shaded-1.16.1-jar", ], } diff --git a/netty/shaded/grpc-netty-shaded-1.14.0.jar b/netty/shaded/grpc-netty-shaded-1.14.0.jar Binary files differdeleted file mode 100644 index 230b7de90..000000000 --- a/netty/shaded/grpc-netty-shaded-1.14.0.jar +++ /dev/null diff --git a/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java index 4711af97b..bbceb364e 100644 --- a/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java @@ -18,8 +18,6 @@ package io.grpc.netty; import io.grpc.Internal; import io.grpc.internal.ClientTransportFactory; -import io.grpc.internal.ProxyParameters; -import java.net.SocketAddress; /** * Internal {@link NettyChannelBuilder} accessor. This is intended for usage internal to the gRPC @@ -39,28 +37,17 @@ public final class InternalNettyChannelBuilder { channelBuilder.overrideAuthorityChecker(authorityChecker); } - /** - * Interface to create netty dynamic parameters. - */ - public interface TransportCreationParamsFilterFactory - extends NettyChannelBuilder.TransportCreationParamsFilterFactory { - @Override - TransportCreationParamsFilter create( - SocketAddress targetServerAddress, String authority, String userAgent, - ProxyParameters proxy); - } + /** A class that provides a Netty handler to control protocol negotiation. */ + public interface ProtocolNegotiatorFactory + extends NettyChannelBuilder.ProtocolNegotiatorFactory {} /** - * {@link TransportCreationParamsFilter} are those that may depend on late-known information about - * a client transport. This interface can be used to dynamically alter params based on the - * params of {@code ClientTransportFactory#newClientTransport}. + * Sets the {@link ProtocolNegotiatorFactory} to be used. Overrides any specified negotiation type + * and {@code SslContext}. */ - public interface TransportCreationParamsFilter - extends NettyChannelBuilder.TransportCreationParamsFilter {} - - public static void setDynamicTransportParamsFactory( - NettyChannelBuilder builder, TransportCreationParamsFilterFactory factory) { - builder.setDynamicParamsFactory(factory); + public static void setProtocolNegotiatorFactory( + NettyChannelBuilder builder, ProtocolNegotiatorFactory protocolNegotiator) { + builder.protocolNegotiatorFactory(protocolNegotiator); } public static void setStatsEnabled(NettyChannelBuilder builder, boolean value) { diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 633cb3232..46beec533 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -17,7 +17,6 @@ package io.grpc.netty; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIME_NANOS; @@ -27,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.grpc.Attributes; +import io.grpc.EquivalentAddressGroup; import io.grpc.ExperimentalApi; import io.grpc.Internal; import io.grpc.NameResolver; @@ -80,7 +80,8 @@ public final class NettyChannelBuilder private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED; private long keepAliveTimeoutNanos = DEFAULT_KEEPALIVE_TIMEOUT_NANOS; private boolean keepAliveWithoutCalls; - private TransportCreationParamsFilterFactory dynamicParamsFactory; + private ProtocolNegotiatorFactory protocolNegotiatorFactory; + private LocalSocketPicker localSocketPicker; /** * Creates a new builder with the given server address. This factory method is primarily intended @@ -327,13 +328,49 @@ public final class NettyChannelBuilder return this; } + + /** + * If non-{@code null}, attempts to create connections bound to a local port. + */ + public NettyChannelBuilder localSocketPicker(@Nullable LocalSocketPicker localSocketPicker) { + this.localSocketPicker = localSocketPicker; + return this; + } + + /** + * This class is meant to be overriden with a custom implementation of + * {@link #createSocketAddress}. The default implementation is a no-op. + * + * @since 1.16.0 + */ + @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4917") + public static class LocalSocketPicker { + + /** + * Called by gRPC to pick local socket to bind to. This may be called multiple times. + * Subclasses are expected to override this method. + * + * @param remoteAddress the remote address to connect to. + * @param attrs the Attributes present on the {@link io.grpc.EquivalentAddressGroup} associated + * with the address. + * @return a {@link SocketAddress} suitable for binding, or else {@code null}. + * @since 1.16.0 + */ + @Nullable + public SocketAddress createSocketAddress( + SocketAddress remoteAddress, @EquivalentAddressGroup.Attr Attributes attrs) { + return null; + } + } + @Override @CheckReturnValue @Internal protected ClientTransportFactory buildTransportFactory() { - TransportCreationParamsFilterFactory transportCreationParamsFilterFactory = - dynamicParamsFactory; - if (transportCreationParamsFilterFactory == null) { + ProtocolNegotiator negotiator; + if (protocolNegotiatorFactory != null) { + negotiator = protocolNegotiatorFactory.buildProtocolNegotiator(); + } else { SslContext localSslContext = sslContext; if (negotiationType == NegotiationType.TLS && localSslContext == null) { try { @@ -342,16 +379,13 @@ public final class NettyChannelBuilder throw new RuntimeException(ex); } } - ProtocolNegotiator negotiator = - createProtocolNegotiatorByType(negotiationType, localSslContext); - transportCreationParamsFilterFactory = - new DefaultNettyTransportCreationParamsFilterFactory(negotiator); + negotiator = createProtocolNegotiatorByType(negotiationType, localSslContext); } return new NettyTransportFactory( - transportCreationParamsFilterFactory, channelType, channelOptions, + negotiator, channelType, channelOptions, eventLoopGroup, flowControlWindow, maxInboundMessageSize(), maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, - transportTracerFactory.create()); + transportTracerFactory.create(), localSocketPicker); } @Override @@ -409,8 +443,9 @@ public final class NettyChannelBuilder return super.checkAuthority(authority); } - void setDynamicParamsFactory(TransportCreationParamsFilterFactory factory) { - this.dynamicParamsFactory = checkNotNull(factory, "factory"); + void protocolNegotiatorFactory(ProtocolNegotiatorFactory protocolNegotiatorFactory) { + this.protocolNegotiatorFactory + = Preconditions.checkNotNull(protocolNegotiatorFactory, "protocolNegotiatorFactory"); } @Override @@ -434,24 +469,12 @@ public final class NettyChannelBuilder return this; } - interface TransportCreationParamsFilterFactory { - @CheckReturnValue - TransportCreationParamsFilter create( - SocketAddress targetServerAddress, - String authority, - @Nullable String userAgent, - @Nullable ProxyParameters proxy); - } - - @CheckReturnValue - interface TransportCreationParamsFilter { - SocketAddress getTargetServerAddress(); - - String getAuthority(); - - @Nullable String getUserAgent(); - - ProtocolNegotiator getProtocolNegotiator(); + interface ProtocolNegotiatorFactory { + /** + * Returns a ProtocolNegotatior instance configured for this Builder. This method is called + * during {@code ManagedChannelBuilder#build()}. + */ + ProtocolNegotiator buildProtocolNegotiator(); } /** @@ -459,7 +482,7 @@ public final class NettyChannelBuilder */ @CheckReturnValue private static final class NettyTransportFactory implements ClientTransportFactory { - private final TransportCreationParamsFilterFactory transportCreationParamsFilterFactory; + private final ProtocolNegotiator protocolNegotiator; private final Class<? extends Channel> channelType; private final Map<ChannelOption<?>, ?> channelOptions; private final EventLoopGroup group; @@ -471,15 +494,16 @@ public final class NettyChannelBuilder private final long keepAliveTimeoutNanos; private final boolean keepAliveWithoutCalls; private final TransportTracer transportTracer; + private final LocalSocketPicker localSocketPicker; private boolean closed; - NettyTransportFactory(TransportCreationParamsFilterFactory transportCreationParamsFilterFactory, + NettyTransportFactory(ProtocolNegotiator protocolNegotiator, Class<? extends Channel> channelType, Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, - TransportTracer transportTracer) { - this.transportCreationParamsFilterFactory = transportCreationParamsFilterFactory; + TransportTracer transportTracer, LocalSocketPicker localSocketPicker) { + this.protocolNegotiator = protocolNegotiator; this.channelType = channelType; this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions); this.flowControlWindow = flowControlWindow; @@ -489,6 +513,8 @@ public final class NettyChannelBuilder this.keepAliveTimeoutNanos = keepAliveTimeoutNanos; this.keepAliveWithoutCalls = keepAliveWithoutCalls; this.transportTracer = transportTracer; + this.localSocketPicker = + localSocketPicker != null ? localSocketPicker : new LocalSocketPicker(); usingSharedGroup = group == null; if (usingSharedGroup) { @@ -504,12 +530,13 @@ public final class NettyChannelBuilder SocketAddress serverAddress, ClientTransportOptions options) { checkState(!closed, "The transport factory is closed."); - TransportCreationParamsFilter dparams = - transportCreationParamsFilterFactory.create( - serverAddress, - options.getAuthority(), - options.getUserAgent(), - options.getProxyParameters()); + ProtocolNegotiator localNegotiator = protocolNegotiator; + ProxyParameters proxyParams = options.getProxyParameters(); + if (proxyParams != null) { + localNegotiator = ProtocolNegotiators.httpProxy( + proxyParams.proxyAddress, proxyParams.username, proxyParams.password, + protocolNegotiator); + } final AtomicBackoff.State keepAliveTimeNanosState = keepAliveTimeNanos.getState(); Runnable tooManyPingsRunnable = new Runnable() { @@ -518,12 +545,14 @@ public final class NettyChannelBuilder keepAliveTimeNanosState.backoff(); } }; + NettyClientTransport transport = new NettyClientTransport( - dparams.getTargetServerAddress(), channelType, channelOptions, group, - dparams.getProtocolNegotiator(), flowControlWindow, + serverAddress, channelType, channelOptions, group, + localNegotiator, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, - keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(), - tooManyPingsRunnable, transportTracer, options.getEagAttributes()); + keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(), + tooManyPingsRunnable, transportTracer, options.getEagAttributes(), + localSocketPicker); return transport; } @@ -539,73 +568,10 @@ public final class NettyChannelBuilder } closed = true; + protocolNegotiator.close(); if (usingSharedGroup) { SharedResourceHolder.release(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP, group); } } } - - private static final class DefaultNettyTransportCreationParamsFilterFactory - implements TransportCreationParamsFilterFactory { - final ProtocolNegotiator negotiator; - - DefaultNettyTransportCreationParamsFilterFactory(ProtocolNegotiator negotiator) { - this.negotiator = negotiator; - } - - @Override - public TransportCreationParamsFilter create( - SocketAddress targetServerAddress, - String authority, - String userAgent, - ProxyParameters proxyParams) { - ProtocolNegotiator localNegotiator = negotiator; - if (proxyParams != null) { - localNegotiator = ProtocolNegotiators.httpProxy( - proxyParams.proxyAddress, proxyParams.username, proxyParams.password, negotiator); - } - return new DynamicNettyTransportParams( - targetServerAddress, authority, userAgent, localNegotiator); - } - } - - @CheckReturnValue - private static final class DynamicNettyTransportParams implements TransportCreationParamsFilter { - - private final SocketAddress targetServerAddress; - private final String authority; - @Nullable private final String userAgent; - private final ProtocolNegotiator protocolNegotiator; - - private DynamicNettyTransportParams( - SocketAddress targetServerAddress, - String authority, - String userAgent, - ProtocolNegotiator protocolNegotiator) { - this.targetServerAddress = targetServerAddress; - this.authority = authority; - this.userAgent = userAgent; - this.protocolNegotiator = protocolNegotiator; - } - - @Override - public SocketAddress getTargetServerAddress() { - return targetServerAddress; - } - - @Override - public String getAuthority() { - return authority; - } - - @Override - public String getUserAgent() { - return userAgent; - } - - @Override - public ProtocolNegotiator getProtocolNegotiator() { - return protocolNegotiator; - } - } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 6141db2fe..c1cc07850 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -40,6 +40,7 @@ import io.grpc.internal.KeepAliveManager; import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; +import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -64,7 +65,7 @@ import javax.annotation.Nullable; class NettyClientTransport implements ConnectionClientTransport { private final InternalLogId logId = InternalLogId.allocate(getClass().getName()); private final Map<ChannelOption<?>, ?> channelOptions; - private final SocketAddress address; + private final SocketAddress remoteAddress; private final Class<? extends Channel> channelType; private final EventLoopGroup group; private final ProtocolNegotiator negotiator; @@ -91,6 +92,7 @@ class NettyClientTransport implements ConnectionClientTransport { /** Since not thread-safe, may only be used from event loop. */ private final TransportTracer transportTracer; private final Attributes eagAttributes; + private final LocalSocketPicker localSocketPicker; NettyClientTransport( SocketAddress address, Class<? extends Channel> channelType, @@ -98,9 +100,10 @@ class NettyClientTransport implements ConnectionClientTransport { ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, - Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes) { + Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, + LocalSocketPicker localSocketPicker) { this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); - this.address = Preconditions.checkNotNull(address, "address"); + this.remoteAddress = Preconditions.checkNotNull(address, "address"); this.group = Preconditions.checkNotNull(group, "group"); this.channelType = Preconditions.checkNotNull(channelType, "channelType"); this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions"); @@ -117,6 +120,7 @@ class NettyClientTransport implements ConnectionClientTransport { Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer"); this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes"); + this.localSocketPicker = Preconditions.checkNotNull(localSocketPicker, "localSocketPicker"); } @Override @@ -263,7 +267,13 @@ class NettyClientTransport implements ConnectionClientTransport { } }); // Start the connection operation to the server. - channel.connect(address); + SocketAddress localAddress = + localSocketPicker.createSocketAddress(remoteAddress, eagAttributes); + if (localAddress != null) { + channel.connect(remoteAddress, localAddress); + } else { + channel.connect(remoteAddress); + } if (keepAliveManager != null) { keepAliveManager.onTransportStarted(); @@ -305,7 +315,7 @@ class NettyClientTransport implements ConnectionClientTransport { public String toString() { return MoreObjects.toStringHelper(this) .add("logId", logId.getId()) - .add("address", address) + .add("remoteAddress", remoteAddress) .add("channel", channel) .toString(); } diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java index 132e96a03..098ba73fb 100644 --- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java +++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java @@ -43,4 +43,11 @@ public interface ProtocolNegotiator { * completed successfully. */ Handler newHandler(GrpcHttp2ConnectionHandler grpcHandler); + + /** + * Releases resources held by this negotiator. Called when the Channel transitions to terminated. + * Is currently only supported on client-side; server-side protocol negotiators will not see this + * method called. + */ + void close(); } diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java index 3a13207cb..061801705 100644 --- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java +++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java @@ -22,12 +22,12 @@ import static io.grpc.netty.GrpcSslContexts.NEXT_PROTOCOL_VERSIONS; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.grpc.Attributes; -import io.grpc.CallCredentials; import io.grpc.Grpc; import io.grpc.Internal; import io.grpc.InternalChannelz; import io.grpc.SecurityLevel; import io.grpc.Status; +import io.grpc.internal.GrpcAttributes; import io.grpc.internal.GrpcUtil; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; @@ -90,6 +90,7 @@ public final class ProtocolNegotiators { // Set sttributes before replace to be sure we pass it before accepting any requests. handler.handleProtocolNegotiationCompleted(Attributes.newBuilder() .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress()) .build(), /*securityInfo=*/ null); // Just replace this handler with the gRPC handler. @@ -104,6 +105,9 @@ public final class ProtocolNegotiators { return new PlaintextHandler(); } + + @Override + public void close() {} }; } @@ -117,6 +121,9 @@ public final class ProtocolNegotiators { public Handler newHandler(GrpcHttp2ConnectionHandler handler) { return new ServerTlsHandler(sslContext, handler); } + + @Override + public void close() {} }; } @@ -157,6 +164,7 @@ public final class ProtocolNegotiators { Attributes.newBuilder() .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress()) .build(), new InternalChannelz.Security(new InternalChannelz.Tls(session))); // Replace this handler with the GRPC handler. @@ -207,6 +215,13 @@ public final class ProtocolNegotiators { return new BufferUntilProxyTunnelledHandler( proxyHandler, negotiator.newHandler(http2Handler)); } + + // This method is not normally called, because we use httpProxy on a per-connection basis in + // NettyChannelBuilder. Instead, we expect `negotiator' to be closed by NettyTransportFactory. + @Override + public void close() { + negotiator.close(); + } } return new ProxyNegotiator(); @@ -310,6 +325,9 @@ public final class ProtocolNegotiators { }; return new BufferUntilTlsNegotiatedHandler(sslBootstrap, handler); } + + @Override + public void close() {} } /** A tuple of (host, port). */ @@ -341,6 +359,9 @@ public final class ProtocolNegotiators { new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000); return new BufferingHttp2UpgradeHandler(upgrader, handler); } + + @Override + public void close() {} } /** @@ -357,6 +378,9 @@ public final class ProtocolNegotiators { public Handler newHandler(GrpcHttp2ConnectionHandler handler) { return new BufferUntilChannelActiveHandler(handler); } + + @Override + public void close() {} } private static RuntimeException unavailableException(String msg) { @@ -651,7 +675,8 @@ public final class ProtocolNegotiators { Attributes.newBuilder() .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) - .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress()) + .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) .build(), new InternalChannelz.Security(new InternalChannelz.Tls(session))); writeBufferedAndRemove(ctx); @@ -699,7 +724,8 @@ public final class ProtocolNegotiators { Attributes .newBuilder() .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) - .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress()) + .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) .build(), /*securityInfo=*/ null); super.channelActive(ctx); @@ -742,7 +768,8 @@ public final class ProtocolNegotiators { Attributes .newBuilder() .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) - .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) + .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress()) + .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) .build(), /*securityInfo=*/ null); } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) { diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 682af1b9e..7b28fac5c 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -61,6 +61,7 @@ import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.TransportTracer; import io.grpc.internal.testing.TestUtils; +import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; @@ -77,14 +78,17 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; import javax.net.ssl.SSLHandshakeException; import org.junit.After; import org.junit.Before; @@ -105,6 +109,8 @@ public class NettyClientTransportTest { private ManagedClientTransport.Listener clientTransportListener; private final List<NettyClientTransport> transports = new ArrayList<>(); + private final LinkedBlockingQueue<Attributes> serverTransportAttributesList = + new LinkedBlockingQueue<>(); private final NioEventLoopGroup group = new NioEventLoopGroup(1); private final EchoServerListener serverListener = new EchoServerListener(); private final InternalChannelz channelz = new InternalChannelz(); @@ -175,7 +181,7 @@ public class NettyClientTransportTest { address, NioSocketChannel.class, channelOptions, group, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, - tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY); + tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker()); transports.add(transport); callMeMaybe(transport.start(clientTransportListener)); @@ -415,7 +421,7 @@ public class NettyClientTransportTest { address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, - null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY); + null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker()); transports.add(transport); // Should not throw @@ -536,6 +542,11 @@ public class NettyClientTransportTest { assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION)); assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR)); + Attributes serverTransportAttrs = serverTransportAttributesList.poll(1, TimeUnit.SECONDS); + assertNotNull(serverTransportAttrs); + SocketAddress clientAddr = serverTransportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + assertNotNull(clientAddr); + assertEquals(clientAddr, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR)); } @Test @@ -593,7 +604,7 @@ public class NettyClientTransportTest { DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, keepAliveTimeNano, keepAliveTimeoutNano, false, authority, userAgent, tooManyPingsRunnable, - new TransportTracer(), eagAttributes); + new TransportTracer(), eagAttributes, new SocketPicker()); transports.add(transport); return transport; } @@ -749,7 +760,7 @@ public class NettyClientTransportTest { } } - private static final class EchoServerListener implements ServerListener { + private final class EchoServerListener implements ServerListener { final List<NettyServerTransport> transports = new ArrayList<>(); final List<EchoServerStreamListener> streamListeners = Collections.synchronizedList(new ArrayList<EchoServerStreamListener>()); @@ -769,6 +780,7 @@ public class NettyClientTransportTest { @Override public Attributes transportReady(Attributes transportAttrs) { + serverTransportAttributesList.add(transportAttrs); return transportAttrs; } @@ -821,5 +833,17 @@ public class NettyClientTransportTest { this.grpcHandler = grpcHandler; return handler = new NoopHandler(grpcHandler); } + + @Override + public void close() {} + } + + private static final class SocketPicker extends LocalSocketPicker { + + @Nullable + @Override + public SocketAddress createSocketAddress(SocketAddress remoteAddress, Attributes attrs) { + return null; + } } } |