diff options
Diffstat (limited to 'netty')
7 files changed, 78 insertions, 46 deletions
diff --git a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java index 7fedf3327..5163e76e5 100644 --- a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java +++ b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java @@ -18,6 +18,7 @@ package io.grpc.netty; import io.grpc.Attributes; import io.grpc.Internal; +import io.grpc.internal.Channelz; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2ConnectionEncoder; @@ -44,14 +45,26 @@ public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler } /** + * Same as {@link #handleProtocolNegotiationCompleted(Attributes, Channelz.Security)} + * but with no {@link Channelz.Security}. + * + * @deprecated Use the two argument method instead. + */ + @Deprecated + public void handleProtocolNegotiationCompleted(Attributes attrs) { + handleProtocolNegotiationCompleted(attrs, /*securityInfo=*/ null); + } + + /** * Triggered on protocol negotiation completion. * * <p>It must me called after negotiation is completed but before given handler is added to the * channel. * * @param attrs arbitrary attributes passed after protocol negotiation (eg. SSLSession). + * @param securityInfo informs channelz about the security protocol. */ - public void handleProtocolNegotiationCompleted(Attributes attrs) { + public void handleProtocolNegotiationCompleted(Attributes attrs, Channelz.Security securityInfo) { } /** diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index fd16746b4..5f0d37354 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -27,6 +27,7 @@ import io.grpc.Attributes; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusException; +import io.grpc.internal.Channelz; import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.ClientTransport.PingCallback; import io.grpc.internal.GrpcUtil; @@ -104,6 +105,7 @@ class NettyClientHandler extends AbstractNettyHandler { private WriteQueue clientWriteQueue; private Http2Ping ping; private Attributes attributes = Attributes.EMPTY; + private Channelz.Security securityInfo; static NettyClientHandler newHandler( ClientTransportLifecycleManager lifecycleManager, @@ -407,9 +409,15 @@ class NettyClientHandler extends AbstractNettyHandler { } @Override - public void handleProtocolNegotiationCompleted(Attributes attributes) { + public void handleProtocolNegotiationCompleted( + Attributes attributes, Channelz.Security securityInfo) { this.attributes = attributes; - super.handleProtocolNegotiationCompleted(attributes); + this.securityInfo = securityInfo; + super.handleProtocolNegotiationCompleted(attributes, securityInfo); + } + + Channelz.Security getSecurityInfo() { + return securityInfo; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 0020c54f5..7d963fc8e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -28,7 +28,6 @@ import io.grpc.CallOptions; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; -import io.grpc.internal.Channelz.Security; import io.grpc.internal.Channelz.SocketStats; import io.grpc.internal.ClientStream; import io.grpc.internal.ConnectionClientTransport; @@ -80,7 +79,6 @@ class NettyClientTransport implements ConnectionClientTransport { private final long keepAliveTimeoutNanos; private final boolean keepAliveWithoutCalls; private final Runnable tooManyPingsRunnable; - private ProtocolNegotiator.Handler negotiationHandler; private NettyClientHandler handler; // We should not send on the channel until negotiation completes. This is a hard requirement @@ -320,26 +318,14 @@ class NettyClientTransport implements ConnectionClientTransport { if (channel.eventLoop().inEventLoop()) { // This is necessary, otherwise we will block forever if we get the future from inside // the event loop. - result.set( - new SocketStats( - transportTracer.getStats(), - channel.localAddress(), - channel.remoteAddress(), - Utils.getSocketOptions(channel), - new Security())); + result.set(getStatsHelper(channel)); return result; } channel.eventLoop().submit( new Runnable() { @Override public void run() { - result.set( - new SocketStats( - transportTracer.getStats(), - channel.localAddress(), - channel.remoteAddress(), - Utils.getSocketOptions(channel), - new Security())); + result.set(getStatsHelper(channel)); } }) .addListener( @@ -354,6 +340,16 @@ class NettyClientTransport implements ConnectionClientTransport { return result; } + private SocketStats getStatsHelper(Channel ch) { + assert ch.eventLoop().inEventLoop(); + return new SocketStats( + transportTracer.getStats(), + channel.localAddress(), + channel.remoteAddress(), + Utils.getSocketOptions(ch), + handler == null ? null : handler.getSecurityInfo()); + } + @VisibleForTesting Channel channel() { return channel; diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 47c628e0c..0bacd5081 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -36,6 +36,7 @@ import io.grpc.InternalStatus; import io.grpc.Metadata; import io.grpc.ServerStreamTracer; import io.grpc.Status; +import io.grpc.internal.Channelz; import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.LogExceptionRunnable; @@ -112,6 +113,7 @@ class NettyServerHandler extends AbstractNettyHandler { private final KeepAliveEnforcer keepAliveEnforcer; /** Incomplete attributes produced by negotiator. */ private Attributes negotiationAttributes; + private Channelz.Security securityInfo; /** Completed attributes produced by transportReady. */ private Attributes attributes; private Throwable connectionError; @@ -504,8 +506,14 @@ class NettyServerHandler extends AbstractNettyHandler { } @Override - public void handleProtocolNegotiationCompleted(Attributes attrs) { + public void handleProtocolNegotiationCompleted( + Attributes attrs, Channelz.Security securityInfo) { negotiationAttributes = attrs; + this.securityInfo = securityInfo; + } + + Channelz.Security getSecurityInfo() { + return securityInfo; } @VisibleForTesting diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index c76ca6673..5c00e2090 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -59,6 +59,8 @@ class NettyServerTransport implements ServerTransport { private final ChannelPromise channelUnused; private final ProtocolNegotiator protocolNegotiator; private final int maxStreams; + // only accessed from channel event loop + private NettyServerHandler grpcHandler; private ServerTransportListener listener; private boolean terminated; private final int flowControlWindow; @@ -115,7 +117,7 @@ class NettyServerTransport implements ServerTransport { this.listener = listener; // Create the Netty handler for the pipeline. - final NettyServerHandler grpcHandler = createHandler(listener, channelUnused); + grpcHandler = createHandler(listener, channelUnused); NettyHandlerSettings.setAutoWindow(grpcHandler); // Notify when the channel closes. @@ -199,30 +201,17 @@ class NettyServerTransport implements ServerTransport { @Override public ListenableFuture<SocketStats> getStats() { final SettableFuture<SocketStats> result = SettableFuture.create(); - // TODO: fill in security if (channel.eventLoop().inEventLoop()) { // This is necessary, otherwise we will block forever if we get the future from inside // the event loop. - result.set( - new SocketStats( - transportTracer.getStats(), - channel.localAddress(), - channel.remoteAddress(), - Utils.getSocketOptions(channel), - /*security=*/ null)); + result.set(getStatsHelper(channel)); return result; } channel.eventLoop().submit( new Runnable() { @Override public void run() { - result.set( - new SocketStats( - transportTracer.getStats(), - channel.localAddress(), - channel.remoteAddress(), - Utils.getSocketOptions(channel), - /*security=*/ null)); + result.set(getStatsHelper(channel)); } }) .addListener( @@ -237,6 +226,16 @@ class NettyServerTransport implements ServerTransport { return result; } + private SocketStats getStatsHelper(Channel ch) { + Preconditions.checkState(ch.eventLoop().inEventLoop()); + return new SocketStats( + transportTracer.getStats(), + channel.localAddress(), + channel.remoteAddress(), + Utils.getSocketOptions(ch), + grpcHandler == null ? null : grpcHandler.getSecurityInfo()); + } + /** * Creates the Netty handler to be used in the channel pipeline. */ diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java index 398ef8838..b07814a98 100644 --- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java +++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java @@ -25,6 +25,7 @@ import io.grpc.Attributes; import io.grpc.Grpc; import io.grpc.Internal; import io.grpc.Status; +import io.grpc.internal.Channelz; import io.grpc.internal.GrpcUtil; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; @@ -62,6 +63,7 @@ import java.util.logging.Logger; import javax.annotation.Nullable; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLSession; /** * Common {@link ProtocolNegotiator}s used by gRPC. @@ -86,7 +88,8 @@ public final class ProtocolNegotiators { // Set sttributes before replace to be sure we pass it before accepting any requests. handler.handleProtocolNegotiationCompleted(Attributes.newBuilder() .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) - .build()); + .build(), + /*securityInfo=*/ null); // Just replace this handler with the gRPC handler. ctx.pipeline().replace(this, null, handler); } @@ -145,14 +148,15 @@ public final class ProtocolNegotiators { SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt; if (handshakeEvent.isSuccess()) { if (NEXT_PROTOCOL_VERSIONS.contains(sslHandler(ctx.pipeline()).applicationProtocol())) { + SSLSession session = sslHandler(ctx.pipeline()).engine().getSession(); // Successfully negotiated the protocol. // Notify about completion and pass down SSLSession in attributes. grpcHandler.handleProtocolNegotiationCompleted( Attributes.newBuilder() - .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, - sslHandler(ctx.pipeline()).engine().getSession()) + .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) - .build()); + .build(), + new Channelz.Security(new Channelz.Tls(session))); // Replace this handler with the GRPC handler. ctx.pipeline().replace(this, null, grpcHandler); } else { @@ -634,13 +638,15 @@ public final class ProtocolNegotiators { // will fail before we see the userEvent, and the channel is closed down prematurely. ctx.pipeline().addBefore(ctx.name(), null, grpcHandler); + SSLSession session = handler.engine().getSession(); // Successfully negotiated the protocol. // Notify about completion and pass down SSLSession in attributes. grpcHandler.handleProtocolNegotiationCompleted( Attributes.newBuilder() - .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, handler.engine().getSession()) + .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session) .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) - .build()); + .build(), + new Channelz.Security(new Channelz.Tls(session))); writeBufferedAndRemove(ctx); } else { Exception ex = new Exception( @@ -686,7 +692,8 @@ public final class ProtocolNegotiators { Attributes .newBuilder() .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) - .build()); + .build(), + /*securityInfo=*/ null); super.channelActive(ctx); } } @@ -727,7 +734,8 @@ public final class ProtocolNegotiators { Attributes .newBuilder() .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) - .build()); + .build(), + /*securityInfo=*/ null); } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) { fail(ctx, unavailableException("HTTP/2 upgrade rejected")); } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index 8be0ba6ec..eb4aa3d5a 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -193,7 +193,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand handler().setKeepAliveManagerForTest(spyKeepAliveManager); // Simulate receipt of the connection preface - handler().handleProtocolNegotiationCompleted(Attributes.EMPTY); + handler().handleProtocolNegotiationCompleted(Attributes.EMPTY, /*securityInfo=*/ null); channelRead(Http2CodecUtil.connectionPrefaceBuf()); // Simulate receipt of initial remote settings. ByteBuf serializedSettings = serializeSettings(new Http2Settings()); @@ -204,7 +204,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand public void transportReadyDelayedUntilConnectionPreface() throws Exception { initChannel(new GrpcHttp2ServerHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE)); - handler().handleProtocolNegotiationCompleted(Attributes.EMPTY); + handler().handleProtocolNegotiationCompleted(Attributes.EMPTY, /*securityInfo=*/ null); verify(transportListener, never()).transportReady(any(Attributes.class)); // Simulate receipt of the connection preface |