aboutsummaryrefslogtreecommitdiff
path: root/netty
diff options
context:
space:
mode:
Diffstat (limited to 'netty')
-rw-r--r--netty/shaded/Android.bp15
-rw-r--r--netty/shaded/grpc-netty-shaded-1.14.0.jarbin6401211 -> 0 bytes
-rw-r--r--netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java29
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java184
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientTransport.java20
-rw-r--r--netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java7
-rw-r--r--netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java35
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java32
8 files changed, 174 insertions, 148 deletions
diff --git a/netty/shaded/Android.bp b/netty/shaded/Android.bp
index 346371f1e..5aecf42ba 100644
--- a/netty/shaded/Android.bp
+++ b/netty/shaded/Android.bp
@@ -1,5 +1,5 @@
//
-// Copyright (C) 2018 The Android Open Source Project
+// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,10 +13,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
-// TODO: Build from source instead
-java_import_host {
+
+// The below module imports the Netty shaded binary from the Maven repository
+// since it's not trivial to build from source due to the large number of
+// external dependencies and missing build-system support.
+//
+// WARNING: The artifact version must match the source to avoid runtime issues.
+java_library_host {
name: "grpc-java-netty-shaded",
- jars: [
- "grpc-netty-shaded-1.14.0.jar",
+ static_libs: [
+ "grpc-netty-shaded-1.16.1-jar",
],
}
diff --git a/netty/shaded/grpc-netty-shaded-1.14.0.jar b/netty/shaded/grpc-netty-shaded-1.14.0.jar
deleted file mode 100644
index 230b7de90..000000000
--- a/netty/shaded/grpc-netty-shaded-1.14.0.jar
+++ /dev/null
Binary files differ
diff --git a/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java
index 4711af97b..bbceb364e 100644
--- a/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java
@@ -18,8 +18,6 @@ package io.grpc.netty;
import io.grpc.Internal;
import io.grpc.internal.ClientTransportFactory;
-import io.grpc.internal.ProxyParameters;
-import java.net.SocketAddress;
/**
* Internal {@link NettyChannelBuilder} accessor. This is intended for usage internal to the gRPC
@@ -39,28 +37,17 @@ public final class InternalNettyChannelBuilder {
channelBuilder.overrideAuthorityChecker(authorityChecker);
}
- /**
- * Interface to create netty dynamic parameters.
- */
- public interface TransportCreationParamsFilterFactory
- extends NettyChannelBuilder.TransportCreationParamsFilterFactory {
- @Override
- TransportCreationParamsFilter create(
- SocketAddress targetServerAddress, String authority, String userAgent,
- ProxyParameters proxy);
- }
+ /** A class that provides a Netty handler to control protocol negotiation. */
+ public interface ProtocolNegotiatorFactory
+ extends NettyChannelBuilder.ProtocolNegotiatorFactory {}
/**
- * {@link TransportCreationParamsFilter} are those that may depend on late-known information about
- * a client transport. This interface can be used to dynamically alter params based on the
- * params of {@code ClientTransportFactory#newClientTransport}.
+ * Sets the {@link ProtocolNegotiatorFactory} to be used. Overrides any specified negotiation type
+ * and {@code SslContext}.
*/
- public interface TransportCreationParamsFilter
- extends NettyChannelBuilder.TransportCreationParamsFilter {}
-
- public static void setDynamicTransportParamsFactory(
- NettyChannelBuilder builder, TransportCreationParamsFilterFactory factory) {
- builder.setDynamicParamsFactory(factory);
+ public static void setProtocolNegotiatorFactory(
+ NettyChannelBuilder builder, ProtocolNegotiatorFactory protocolNegotiator) {
+ builder.protocolNegotiatorFactory(protocolNegotiator);
}
public static void setStatsEnabled(NettyChannelBuilder builder, boolean value) {
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index 633cb3232..46beec533 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -17,7 +17,6 @@
package io.grpc.netty;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS;
import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIME_NANOS;
@@ -27,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.Attributes;
+import io.grpc.EquivalentAddressGroup;
import io.grpc.ExperimentalApi;
import io.grpc.Internal;
import io.grpc.NameResolver;
@@ -80,7 +80,8 @@ public final class NettyChannelBuilder
private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED;
private long keepAliveTimeoutNanos = DEFAULT_KEEPALIVE_TIMEOUT_NANOS;
private boolean keepAliveWithoutCalls;
- private TransportCreationParamsFilterFactory dynamicParamsFactory;
+ private ProtocolNegotiatorFactory protocolNegotiatorFactory;
+ private LocalSocketPicker localSocketPicker;
/**
* Creates a new builder with the given server address. This factory method is primarily intended
@@ -327,13 +328,49 @@ public final class NettyChannelBuilder
return this;
}
+
+ /**
+ * If non-{@code null}, attempts to create connections bound to a local port.
+ */
+ public NettyChannelBuilder localSocketPicker(@Nullable LocalSocketPicker localSocketPicker) {
+ this.localSocketPicker = localSocketPicker;
+ return this;
+ }
+
+ /**
+ * This class is meant to be overriden with a custom implementation of
+ * {@link #createSocketAddress}. The default implementation is a no-op.
+ *
+ * @since 1.16.0
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4917")
+ public static class LocalSocketPicker {
+
+ /**
+ * Called by gRPC to pick local socket to bind to. This may be called multiple times.
+ * Subclasses are expected to override this method.
+ *
+ * @param remoteAddress the remote address to connect to.
+ * @param attrs the Attributes present on the {@link io.grpc.EquivalentAddressGroup} associated
+ * with the address.
+ * @return a {@link SocketAddress} suitable for binding, or else {@code null}.
+ * @since 1.16.0
+ */
+ @Nullable
+ public SocketAddress createSocketAddress(
+ SocketAddress remoteAddress, @EquivalentAddressGroup.Attr Attributes attrs) {
+ return null;
+ }
+ }
+
@Override
@CheckReturnValue
@Internal
protected ClientTransportFactory buildTransportFactory() {
- TransportCreationParamsFilterFactory transportCreationParamsFilterFactory =
- dynamicParamsFactory;
- if (transportCreationParamsFilterFactory == null) {
+ ProtocolNegotiator negotiator;
+ if (protocolNegotiatorFactory != null) {
+ negotiator = protocolNegotiatorFactory.buildProtocolNegotiator();
+ } else {
SslContext localSslContext = sslContext;
if (negotiationType == NegotiationType.TLS && localSslContext == null) {
try {
@@ -342,16 +379,13 @@ public final class NettyChannelBuilder
throw new RuntimeException(ex);
}
}
- ProtocolNegotiator negotiator =
- createProtocolNegotiatorByType(negotiationType, localSslContext);
- transportCreationParamsFilterFactory =
- new DefaultNettyTransportCreationParamsFilterFactory(negotiator);
+ negotiator = createProtocolNegotiatorByType(negotiationType, localSslContext);
}
return new NettyTransportFactory(
- transportCreationParamsFilterFactory, channelType, channelOptions,
+ negotiator, channelType, channelOptions,
eventLoopGroup, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
- transportTracerFactory.create());
+ transportTracerFactory.create(), localSocketPicker);
}
@Override
@@ -409,8 +443,9 @@ public final class NettyChannelBuilder
return super.checkAuthority(authority);
}
- void setDynamicParamsFactory(TransportCreationParamsFilterFactory factory) {
- this.dynamicParamsFactory = checkNotNull(factory, "factory");
+ void protocolNegotiatorFactory(ProtocolNegotiatorFactory protocolNegotiatorFactory) {
+ this.protocolNegotiatorFactory
+ = Preconditions.checkNotNull(protocolNegotiatorFactory, "protocolNegotiatorFactory");
}
@Override
@@ -434,24 +469,12 @@ public final class NettyChannelBuilder
return this;
}
- interface TransportCreationParamsFilterFactory {
- @CheckReturnValue
- TransportCreationParamsFilter create(
- SocketAddress targetServerAddress,
- String authority,
- @Nullable String userAgent,
- @Nullable ProxyParameters proxy);
- }
-
- @CheckReturnValue
- interface TransportCreationParamsFilter {
- SocketAddress getTargetServerAddress();
-
- String getAuthority();
-
- @Nullable String getUserAgent();
-
- ProtocolNegotiator getProtocolNegotiator();
+ interface ProtocolNegotiatorFactory {
+ /**
+ * Returns a ProtocolNegotatior instance configured for this Builder. This method is called
+ * during {@code ManagedChannelBuilder#build()}.
+ */
+ ProtocolNegotiator buildProtocolNegotiator();
}
/**
@@ -459,7 +482,7 @@ public final class NettyChannelBuilder
*/
@CheckReturnValue
private static final class NettyTransportFactory implements ClientTransportFactory {
- private final TransportCreationParamsFilterFactory transportCreationParamsFilterFactory;
+ private final ProtocolNegotiator protocolNegotiator;
private final Class<? extends Channel> channelType;
private final Map<ChannelOption<?>, ?> channelOptions;
private final EventLoopGroup group;
@@ -471,15 +494,16 @@ public final class NettyChannelBuilder
private final long keepAliveTimeoutNanos;
private final boolean keepAliveWithoutCalls;
private final TransportTracer transportTracer;
+ private final LocalSocketPicker localSocketPicker;
private boolean closed;
- NettyTransportFactory(TransportCreationParamsFilterFactory transportCreationParamsFilterFactory,
+ NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
Class<? extends Channel> channelType, Map<ChannelOption<?>, ?> channelOptions,
EventLoopGroup group, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
- TransportTracer transportTracer) {
- this.transportCreationParamsFilterFactory = transportCreationParamsFilterFactory;
+ TransportTracer transportTracer, LocalSocketPicker localSocketPicker) {
+ this.protocolNegotiator = protocolNegotiator;
this.channelType = channelType;
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.flowControlWindow = flowControlWindow;
@@ -489,6 +513,8 @@ public final class NettyChannelBuilder
this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
this.keepAliveWithoutCalls = keepAliveWithoutCalls;
this.transportTracer = transportTracer;
+ this.localSocketPicker =
+ localSocketPicker != null ? localSocketPicker : new LocalSocketPicker();
usingSharedGroup = group == null;
if (usingSharedGroup) {
@@ -504,12 +530,13 @@ public final class NettyChannelBuilder
SocketAddress serverAddress, ClientTransportOptions options) {
checkState(!closed, "The transport factory is closed.");
- TransportCreationParamsFilter dparams =
- transportCreationParamsFilterFactory.create(
- serverAddress,
- options.getAuthority(),
- options.getUserAgent(),
- options.getProxyParameters());
+ ProtocolNegotiator localNegotiator = protocolNegotiator;
+ ProxyParameters proxyParams = options.getProxyParameters();
+ if (proxyParams != null) {
+ localNegotiator = ProtocolNegotiators.httpProxy(
+ proxyParams.proxyAddress, proxyParams.username, proxyParams.password,
+ protocolNegotiator);
+ }
final AtomicBackoff.State keepAliveTimeNanosState = keepAliveTimeNanos.getState();
Runnable tooManyPingsRunnable = new Runnable() {
@@ -518,12 +545,14 @@ public final class NettyChannelBuilder
keepAliveTimeNanosState.backoff();
}
};
+
NettyClientTransport transport = new NettyClientTransport(
- dparams.getTargetServerAddress(), channelType, channelOptions, group,
- dparams.getProtocolNegotiator(), flowControlWindow,
+ serverAddress, channelType, channelOptions, group,
+ localNegotiator, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
- keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(),
- tooManyPingsRunnable, transportTracer, options.getEagAttributes());
+ keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
+ tooManyPingsRunnable, transportTracer, options.getEagAttributes(),
+ localSocketPicker);
return transport;
}
@@ -539,73 +568,10 @@ public final class NettyChannelBuilder
}
closed = true;
+ protocolNegotiator.close();
if (usingSharedGroup) {
SharedResourceHolder.release(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP, group);
}
}
}
-
- private static final class DefaultNettyTransportCreationParamsFilterFactory
- implements TransportCreationParamsFilterFactory {
- final ProtocolNegotiator negotiator;
-
- DefaultNettyTransportCreationParamsFilterFactory(ProtocolNegotiator negotiator) {
- this.negotiator = negotiator;
- }
-
- @Override
- public TransportCreationParamsFilter create(
- SocketAddress targetServerAddress,
- String authority,
- String userAgent,
- ProxyParameters proxyParams) {
- ProtocolNegotiator localNegotiator = negotiator;
- if (proxyParams != null) {
- localNegotiator = ProtocolNegotiators.httpProxy(
- proxyParams.proxyAddress, proxyParams.username, proxyParams.password, negotiator);
- }
- return new DynamicNettyTransportParams(
- targetServerAddress, authority, userAgent, localNegotiator);
- }
- }
-
- @CheckReturnValue
- private static final class DynamicNettyTransportParams implements TransportCreationParamsFilter {
-
- private final SocketAddress targetServerAddress;
- private final String authority;
- @Nullable private final String userAgent;
- private final ProtocolNegotiator protocolNegotiator;
-
- private DynamicNettyTransportParams(
- SocketAddress targetServerAddress,
- String authority,
- String userAgent,
- ProtocolNegotiator protocolNegotiator) {
- this.targetServerAddress = targetServerAddress;
- this.authority = authority;
- this.userAgent = userAgent;
- this.protocolNegotiator = protocolNegotiator;
- }
-
- @Override
- public SocketAddress getTargetServerAddress() {
- return targetServerAddress;
- }
-
- @Override
- public String getAuthority() {
- return authority;
- }
-
- @Override
- public String getUserAgent() {
- return userAgent;
- }
-
- @Override
- public ProtocolNegotiator getProtocolNegotiator() {
- return protocolNegotiator;
- }
- }
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index 6141db2fe..c1cc07850 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -40,6 +40,7 @@ import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
+import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -64,7 +65,7 @@ import javax.annotation.Nullable;
class NettyClientTransport implements ConnectionClientTransport {
private final InternalLogId logId = InternalLogId.allocate(getClass().getName());
private final Map<ChannelOption<?>, ?> channelOptions;
- private final SocketAddress address;
+ private final SocketAddress remoteAddress;
private final Class<? extends Channel> channelType;
private final EventLoopGroup group;
private final ProtocolNegotiator negotiator;
@@ -91,6 +92,7 @@ class NettyClientTransport implements ConnectionClientTransport {
/** Since not thread-safe, may only be used from event loop. */
private final TransportTracer transportTracer;
private final Attributes eagAttributes;
+ private final LocalSocketPicker localSocketPicker;
NettyClientTransport(
SocketAddress address, Class<? extends Channel> channelType,
@@ -98,9 +100,10 @@ class NettyClientTransport implements ConnectionClientTransport {
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
- Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes) {
+ Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes,
+ LocalSocketPicker localSocketPicker) {
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
- this.address = Preconditions.checkNotNull(address, "address");
+ this.remoteAddress = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
this.channelType = Preconditions.checkNotNull(channelType, "channelType");
this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
@@ -117,6 +120,7 @@ class NettyClientTransport implements ConnectionClientTransport {
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
+ this.localSocketPicker = Preconditions.checkNotNull(localSocketPicker, "localSocketPicker");
}
@Override
@@ -263,7 +267,13 @@ class NettyClientTransport implements ConnectionClientTransport {
}
});
// Start the connection operation to the server.
- channel.connect(address);
+ SocketAddress localAddress =
+ localSocketPicker.createSocketAddress(remoteAddress, eagAttributes);
+ if (localAddress != null) {
+ channel.connect(remoteAddress, localAddress);
+ } else {
+ channel.connect(remoteAddress);
+ }
if (keepAliveManager != null) {
keepAliveManager.onTransportStarted();
@@ -305,7 +315,7 @@ class NettyClientTransport implements ConnectionClientTransport {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("logId", logId.getId())
- .add("address", address)
+ .add("remoteAddress", remoteAddress)
.add("channel", channel)
.toString();
}
diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java
index 132e96a03..098ba73fb 100644
--- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java
+++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiator.java
@@ -43,4 +43,11 @@ public interface ProtocolNegotiator {
* completed successfully.
*/
Handler newHandler(GrpcHttp2ConnectionHandler grpcHandler);
+
+ /**
+ * Releases resources held by this negotiator. Called when the Channel transitions to terminated.
+ * Is currently only supported on client-side; server-side protocol negotiators will not see this
+ * method called.
+ */
+ void close();
}
diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
index 3a13207cb..061801705 100644
--- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
+++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
@@ -22,12 +22,12 @@ import static io.grpc.netty.GrpcSslContexts.NEXT_PROTOCOL_VERSIONS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Attributes;
-import io.grpc.CallCredentials;
import io.grpc.Grpc;
import io.grpc.Internal;
import io.grpc.InternalChannelz;
import io.grpc.SecurityLevel;
import io.grpc.Status;
+import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcUtil;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
@@ -90,6 +90,7 @@ public final class ProtocolNegotiators {
// Set sttributes before replace to be sure we pass it before accepting any requests.
handler.handleProtocolNegotiationCompleted(Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
+ .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.build(),
/*securityInfo=*/ null);
// Just replace this handler with the gRPC handler.
@@ -104,6 +105,9 @@ public final class ProtocolNegotiators {
return new PlaintextHandler();
}
+
+ @Override
+ public void close() {}
};
}
@@ -117,6 +121,9 @@ public final class ProtocolNegotiators {
public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
return new ServerTlsHandler(sslContext, handler);
}
+
+ @Override
+ public void close() {}
};
}
@@ -157,6 +164,7 @@ public final class ProtocolNegotiators {
Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
+ .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.build(),
new InternalChannelz.Security(new InternalChannelz.Tls(session)));
// Replace this handler with the GRPC handler.
@@ -207,6 +215,13 @@ public final class ProtocolNegotiators {
return new BufferUntilProxyTunnelledHandler(
proxyHandler, negotiator.newHandler(http2Handler));
}
+
+ // This method is not normally called, because we use httpProxy on a per-connection basis in
+ // NettyChannelBuilder. Instead, we expect `negotiator' to be closed by NettyTransportFactory.
+ @Override
+ public void close() {
+ negotiator.close();
+ }
}
return new ProxyNegotiator();
@@ -310,6 +325,9 @@ public final class ProtocolNegotiators {
};
return new BufferUntilTlsNegotiatedHandler(sslBootstrap, handler);
}
+
+ @Override
+ public void close() {}
}
/** A tuple of (host, port). */
@@ -341,6 +359,9 @@ public final class ProtocolNegotiators {
new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
return new BufferingHttp2UpgradeHandler(upgrader, handler);
}
+
+ @Override
+ public void close() {}
}
/**
@@ -357,6 +378,9 @@ public final class ProtocolNegotiators {
public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
return new BufferUntilChannelActiveHandler(handler);
}
+
+ @Override
+ public void close() {}
}
private static RuntimeException unavailableException(String msg) {
@@ -651,7 +675,8 @@ public final class ProtocolNegotiators {
Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
- .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
+ .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
+ .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.build(),
new InternalChannelz.Security(new InternalChannelz.Tls(session)));
writeBufferedAndRemove(ctx);
@@ -699,7 +724,8 @@ public final class ProtocolNegotiators {
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
- .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
+ .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
+ .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
.build(),
/*securityInfo=*/ null);
super.channelActive(ctx);
@@ -742,7 +768,8 @@ public final class ProtocolNegotiators {
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
- .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
+ .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
+ .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
.build(),
/*securityInfo=*/ null);
} else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 682af1b9e..7b28fac5c 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -61,6 +61,7 @@ import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.TestUtils;
+import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -77,14 +78,17 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLHandshakeException;
import org.junit.After;
import org.junit.Before;
@@ -105,6 +109,8 @@ public class NettyClientTransportTest {
private ManagedClientTransport.Listener clientTransportListener;
private final List<NettyClientTransport> transports = new ArrayList<>();
+ private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
+ new LinkedBlockingQueue<>();
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
private final EchoServerListener serverListener = new EchoServerListener();
private final InternalChannelz channelz = new InternalChannelz();
@@ -175,7 +181,7 @@ public class NettyClientTransportTest {
address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */,
- tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY);
+ tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker());
transports.add(transport);
callMeMaybe(transport.start(clientTransportListener));
@@ -415,7 +421,7 @@ public class NettyClientTransportTest {
address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
- null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY);
+ null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker());
transports.add(transport);
// Should not throw
@@ -536,6 +542,11 @@ public class NettyClientTransportTest {
assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION));
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
+ Attributes serverTransportAttrs = serverTransportAttributesList.poll(1, TimeUnit.SECONDS);
+ assertNotNull(serverTransportAttrs);
+ SocketAddress clientAddr = serverTransportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
+ assertNotNull(clientAddr);
+ assertEquals(clientAddr, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
}
@Test
@@ -593,7 +604,7 @@ public class NettyClientTransportTest {
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable,
- new TransportTracer(), eagAttributes);
+ new TransportTracer(), eagAttributes, new SocketPicker());
transports.add(transport);
return transport;
}
@@ -749,7 +760,7 @@ public class NettyClientTransportTest {
}
}
- private static final class EchoServerListener implements ServerListener {
+ private final class EchoServerListener implements ServerListener {
final List<NettyServerTransport> transports = new ArrayList<>();
final List<EchoServerStreamListener> streamListeners =
Collections.synchronizedList(new ArrayList<EchoServerStreamListener>());
@@ -769,6 +780,7 @@ public class NettyClientTransportTest {
@Override
public Attributes transportReady(Attributes transportAttrs) {
+ serverTransportAttributesList.add(transportAttrs);
return transportAttrs;
}
@@ -821,5 +833,17 @@ public class NettyClientTransportTest {
this.grpcHandler = grpcHandler;
return handler = new NoopHandler(grpcHandler);
}
+
+ @Override
+ public void close() {}
+ }
+
+ private static final class SocketPicker extends LocalSocketPicker {
+
+ @Nullable
+ @Override
+ public SocketAddress createSocketAddress(SocketAddress remoteAddress, Attributes attrs) {
+ return null;
+ }
}
}