diff options
author | Jiangtao Li <jiangtao@google.com> | 2018-08-29 10:09:09 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-29 10:09:09 -0700 |
commit | 433ac00de43f334104338ba712415e7dc0c0c538 (patch) | |
tree | da5e5486f4ddc99f8c9c3cd9e9338729336f90ea /alts | |
parent | 87513d8e83ac33b9deb1e47ae84a02f6a99424d9 (diff) | |
download | grpc-grpc-java-433ac00de43f334104338ba712415e7dc0c0c538.tar.gz |
alts: convert handshaker service channel to SharedResourceHolder (#4802)
alts: convert handshaker service channel to SharedResourceHolder
Diffstat (limited to 'alts')
4 files changed, 70 insertions, 39 deletions
diff --git a/alts/src/main/java/io/grpc/alts/AltsChannelBuilder.java b/alts/src/main/java/io/grpc/alts/AltsChannelBuilder.java index c560a508f..f0346fad7 100644 --- a/alts/src/main/java/io/grpc/alts/AltsChannelBuilder.java +++ b/alts/src/main/java/io/grpc/alts/AltsChannelBuilder.java @@ -37,7 +37,9 @@ import io.grpc.alts.internal.RpcProtocolVersionsUtil; import io.grpc.alts.internal.TsiHandshaker; import io.grpc.alts.internal.TsiHandshakerFactory; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ObjectPool; import io.grpc.internal.ProxyParameters; +import io.grpc.internal.SharedResourcePool; import io.grpc.netty.InternalNettyChannelBuilder; import io.grpc.netty.InternalNettyChannelBuilder.TransportCreationParamsFilter; import io.grpc.netty.InternalNettyChannelBuilder.TransportCreationParamsFilterFactory; @@ -60,6 +62,8 @@ public final class AltsChannelBuilder extends ForwardingChannelBuilder<AltsChann private final NettyChannelBuilder delegate; private final AltsClientOptions.Builder handshakerOptionsBuilder = new AltsClientOptions.Builder(); + private ObjectPool<ManagedChannel> handshakerChannelPool = + SharedResourcePool.forResource(HandshakerServiceChannel.SHARED_HANDSHAKER_CHANNEL); private TcpfFactory tcpfFactoryForTest; private boolean enableUntrustedAlts; @@ -109,7 +113,10 @@ public final class AltsChannelBuilder extends ForwardingChannelBuilder<AltsChann /** Sets a new handshaker service address for testing. */ public AltsChannelBuilder setHandshakerAddressForTesting(String handshakerAddress) { - HandshakerServiceChannel.setHandshakerAddressForTesting(handshakerAddress); + // Instead of using the default shared channel to the handshaker service, create a fix object + // pool of handshaker service channel for testing. + handshakerChannelPool = + HandshakerServiceChannel.getHandshakerChannelPoolForTesting(handshakerAddress); return this; } @@ -164,9 +171,11 @@ public final class AltsChannelBuilder extends ForwardingChannelBuilder<AltsChann @Override public TsiHandshaker newHandshaker() { // Used the shared grpc channel to connecting to the ALTS handshaker service. - ManagedChannel channel = HandshakerServiceChannel.get(); + // TODO: Release the channel if it is not used. + // https://github.com/grpc/grpc-java/issues/4755. return AltsTsiHandshaker.newClient( - HandshakerServiceGrpc.newStub(channel), handshakerOptions); + HandshakerServiceGrpc.newStub(handshakerChannelPool.getObject()), + handshakerOptions); } }; diff --git a/alts/src/main/java/io/grpc/alts/AltsServerBuilder.java b/alts/src/main/java/io/grpc/alts/AltsServerBuilder.java index 261da4748..a26443178 100644 --- a/alts/src/main/java/io/grpc/alts/AltsServerBuilder.java +++ b/alts/src/main/java/io/grpc/alts/AltsServerBuilder.java @@ -21,6 +21,7 @@ import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; import io.grpc.ExperimentalApi; import io.grpc.HandlerRegistry; +import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerBuilder; @@ -39,6 +40,8 @@ import io.grpc.alts.internal.HandshakerServiceGrpc; import io.grpc.alts.internal.RpcProtocolVersionsUtil; import io.grpc.alts.internal.TsiHandshaker; import io.grpc.alts.internal.TsiHandshakerFactory; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.SharedResourcePool; import io.grpc.netty.NettyServerBuilder; import java.io.File; import java.net.InetSocketAddress; @@ -56,6 +59,8 @@ public final class AltsServerBuilder extends ServerBuilder<AltsServerBuilder> { private static final Logger logger = Logger.getLogger(AltsServerBuilder.class.getName()); private final NettyServerBuilder delegate; + private ObjectPool<ManagedChannel> handshakerChannelPool = + SharedResourcePool.forResource(HandshakerServiceChannel.SHARED_HANDSHAKER_CHANNEL); private boolean enableUntrustedAlts; private AltsServerBuilder(NettyServerBuilder nettyDelegate) { @@ -85,7 +90,10 @@ public final class AltsServerBuilder extends ServerBuilder<AltsServerBuilder> { /** Sets a new handshaker service address for testing. */ public AltsServerBuilder setHandshakerAddressForTesting(String handshakerAddress) { - HandshakerServiceChannel.setHandshakerAddressForTesting(handshakerAddress); + // Instead of using the default shared channel to the handshaker service, create a fix object + // pool of handshaker service channel for testing. + handshakerChannelPool = + HandshakerServiceChannel.getHandshakerChannelPoolForTesting(handshakerAddress); return this; } @@ -194,8 +202,10 @@ public final class AltsServerBuilder extends ServerBuilder<AltsServerBuilder> { @Override public TsiHandshaker newHandshaker() { // Used the shared grpc channel to connecting to the ALTS handshaker service. + // TODO: Release the channel if it is not used. + // https://github.com/grpc/grpc-java/issues/4755. return AltsTsiHandshaker.newServer( - HandshakerServiceGrpc.newStub(HandshakerServiceChannel.get()), + HandshakerServiceGrpc.newStub(handshakerChannelPool.getObject()), new AltsHandshakerOptions(RpcProtocolVersionsUtil.getRpcProtocolVersions())); } })); diff --git a/alts/src/main/java/io/grpc/alts/GoogleDefaultChannelBuilder.java b/alts/src/main/java/io/grpc/alts/GoogleDefaultChannelBuilder.java index 3415e4875..e903bb716 100644 --- a/alts/src/main/java/io/grpc/alts/GoogleDefaultChannelBuilder.java +++ b/alts/src/main/java/io/grpc/alts/GoogleDefaultChannelBuilder.java @@ -40,6 +40,7 @@ import io.grpc.alts.internal.TsiHandshakerFactory; import io.grpc.auth.MoreCallCredentials; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ProxyParameters; +import io.grpc.internal.SharedResourceHolder; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.InternalNettyChannelBuilder; import io.grpc.netty.InternalNettyChannelBuilder.TransportCreationParamsFilter; @@ -115,7 +116,10 @@ public final class GoogleDefaultChannelBuilder @Override public TsiHandshaker newHandshaker() { // Used the shared grpc channel to connecting to the ALTS handshaker service. - ManagedChannel channel = HandshakerServiceChannel.get(); + // TODO: Release the channel if it is not used. + // https://github.com/grpc/grpc-java/issues/4755. + ManagedChannel channel = + SharedResourceHolder.get(HandshakerServiceChannel.SHARED_HANDSHAKER_CHANNEL); return AltsTsiHandshaker.newClient( HandshakerServiceGrpc.newStub(channel), handshakerOptions); } diff --git a/alts/src/main/java/io/grpc/alts/HandshakerServiceChannel.java b/alts/src/main/java/io/grpc/alts/HandshakerServiceChannel.java index 7c640e19e..af1cc928e 100644 --- a/alts/src/main/java/io/grpc/alts/HandshakerServiceChannel.java +++ b/alts/src/main/java/io/grpc/alts/HandshakerServiceChannel.java @@ -16,43 +16,58 @@ package io.grpc.alts; -import com.google.common.base.Preconditions; import io.grpc.ManagedChannel; +import io.grpc.internal.FixedObjectPool; +import io.grpc.internal.SharedResourceHolder.Resource; import io.grpc.netty.NettyChannelBuilder; +import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.concurrent.ThreadFactory; /** - * Class for creating a single shared grpc channel to the ALTS Handshaker Service. The channel to - * the handshaker service is local and is over plaintext. Each application will have at most one - * connection to the handshaker service. - * - * <p>TODO: Release the channel if it is not used. https://github.com/grpc/grpc-java/issues/4755. + * Class for creating a single shared gRPC channel to the ALTS Handshaker Service using + * SharedResourceHolder. The channel to the handshaker service is local and is over plaintext. Each + * application will have at most one connection to the handshaker service. */ final class HandshakerServiceChannel { - // Default handshaker service address. - private static String handshakerAddress = "metadata.google.internal:8080"; - // Shared channel to ALTS handshaker service. - private static ManagedChannel channel = null; - // Construct me not! - private HandshakerServiceChannel() {} + static final Resource<ManagedChannel> SHARED_HANDSHAKER_CHANNEL = + new Resource<ManagedChannel>() { - // Sets handshaker service address for testing and creates the channel to the handshaker service. - public static synchronized void setHandshakerAddressForTesting(String handshakerAddress) { - Preconditions.checkState( - channel == null || HandshakerServiceChannel.handshakerAddress.equals(handshakerAddress), - "HandshakerServiceChannel already created with a different handshakerAddress"); - HandshakerServiceChannel.handshakerAddress = handshakerAddress; - if (channel == null) { - channel = createChannel(); - } - } + private EventLoopGroup eventGroup = null; + + @Override + public ManagedChannel create() { + /* Use its own event loop thread pool to avoid blocking. */ + if (eventGroup == null) { + eventGroup = + new NioEventLoopGroup(1, new DefaultThreadFactory("handshaker pool", true)); + } + return NettyChannelBuilder.forTarget("metadata.google.internal:8080") + .directExecutor() + .eventLoopGroup(eventGroup) + .usePlaintext() + .build(); + } + + @Override + public void close(ManagedChannel instance) { + instance.shutdownNow(); + if (eventGroup != null) { + eventGroup.shutdownGracefully(); + } + } - /** Create a new channel to ALTS handshaker service, if it has not been created yet. */ - private static ManagedChannel createChannel() { - /* Use its own event loop thread pool to avoid blocking. */ + @Override + public String toString() { + return "grpc-alts-handshaker-service-channel"; + } + }; + + /** Returns a fixed object pool of handshaker service channel for testing only. */ + static FixedObjectPool<ManagedChannel> getHandshakerChannelPoolForTesting( + String handshakerAddress) { ThreadFactory clientThreadFactory = new DefaultThreadFactory("handshaker pool", true); ManagedChannel channel = NettyChannelBuilder.forTarget(handshakerAddress) @@ -60,13 +75,6 @@ final class HandshakerServiceChannel { .eventLoopGroup(new NioEventLoopGroup(1, clientThreadFactory)) .usePlaintext() .build(); - return channel; - } - - public static synchronized ManagedChannel get() { - if (channel == null) { - channel = createChannel(); - } - return channel; + return new FixedObjectPool<ManagedChannel>(channel); } } |