diff options
author | Eric Gribkoff <ericgribkoff@google.com> | 2017-10-06 11:18:03 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-06 11:18:03 -0700 |
commit | 4d67c3d63fa93e14f9c6fe2e05a156e9471006e8 (patch) | |
tree | 44c2d7b3b62048c447cebc22684ee43454190c01 /cronet | |
parent | b07c70a09f2e45e578d315d7aa9f0f9fedda6d1a (diff) | |
download | grpc-grpc-java-4d67c3d63fa93e14f9c6fe2e05a156e9471006e8.tar.gz |
cronet: open-source experimental Cronet transport code
Diffstat (limited to 'cronet')
7 files changed, 1125 insertions, 0 deletions
diff --git a/cronet/README.md b/cronet/README.md new file mode 100644 index 000000000..1103f31f6 --- /dev/null +++ b/cronet/README.md @@ -0,0 +1,46 @@ +gRPC Cronet Transport +======================== + +**EXPERIMENTAL:** *gRPC's Cronet transport is an experimental API, and is not +yet integrated with our build system. Using Cronet with gRPC requires manually +integrating the Cronet libraries and the gRPC code in this directory into your +Android application.* + +This code enables using the [Chromium networking stack +(Cronet)](https://chromium.googlesource.com/chromium/src/+/master/components/cronet) +as the transport layer for gRPC on Android. This lets your Android app make +RPCs using the same networking stack as used in the Chrome browser. + +Some advantages of using Cronet with gRPC: +* Bundles an OpenSSL implementation, enabling TLS connections even on older + versions of Android without additional configuration +* Robust to Android network connectivity changes +* Support for [QUIC](https://www.chromium.org/quic) + +Cronet jars are not currently available on Maven. The instructions at +https://github.com/GoogleChrome/cronet-sample/blob/master/README.md describe +how to manually download the Cronet binaries and add them to your Android +application. You will also need to copy the gRPC source files contained in this +directory into your application's code, as we do not currently provide a +`grpc-cronet` dependency. + +To use Cronet, you must have the `ACCESS_NETWORK_STATE` permission set in +`AndroidManifest.xml`: + +``` +<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> +``` + +Once the above steps are completed, you can create a gRPC Cronet channel as +follows: + +``` +import io.grpc.cronet.CronetChannelBuilder; +import org.chromium.net.ExperimentalCronetEngine; + +... + +ExperimentalCronetEngine engine = + new ExperimentalCronetEngine.Builder(context /* Android Context */).build(); +ManagedChannel channel = CronetChannelBuilder.forAddress("localhost", 8080, engine).build(); +``` diff --git a/cronet/src/main/java/io/grpc/cronet/CronetCallOptions.java b/cronet/src/main/java/io/grpc/cronet/CronetCallOptions.java new file mode 100644 index 000000000..a3d43f52f --- /dev/null +++ b/cronet/src/main/java/io/grpc/cronet/CronetCallOptions.java @@ -0,0 +1,34 @@ +/* + * Copyright 2017, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.cronet; + +import io.grpc.CallOptions; + +/** Call options for use with the Cronet transport. */ +public final class CronetCallOptions { + private CronetCallOptions() {} + + /** + * Used for attaching annotation objects to Cronet streams. When the stream finishes, the user can + * get Cronet metrics from {@link org.chromium.net.RequestFinishedInfo.Listener} with the same + * annotation object. + * + * The Object must not be null. + */ + public static final CallOptions.Key<Object> CRONET_ANNOTATION_KEY = + CallOptions.Key.of("cronet-annotation", null); +} diff --git a/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java b/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java new file mode 100644 index 000000000..13e7d4a83 --- /dev/null +++ b/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java @@ -0,0 +1,179 @@ +/* + * Copyright 2016, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.cronet; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Attributes; +import io.grpc.ExperimentalApi; +import io.grpc.NameResolver; +import io.grpc.internal.AbstractManagedChannelImplBuilder; +import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.ConnectionClientTransport; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.SharedResourceHolder; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import javax.annotation.Nullable; +import org.chromium.net.BidirectionalStream; +import org.chromium.net.CronetEngine; +import org.chromium.net.ExperimentalCronetEngine; + +/** Convenience class for building channels with the cronet transport. */ +@ExperimentalApi("There is no plan to make this API stable, given transport API instability") +public class CronetChannelBuilder extends + AbstractManagedChannelImplBuilder<CronetChannelBuilder> { + + /** BidirectionalStream.Builder factory used for getting the gRPC BidirectionalStream. */ + public static abstract class StreamBuilderFactory { + public abstract BidirectionalStream.Builder newBidirectionalStreamBuilder( + String url, BidirectionalStream.Callback callback, Executor executor); + } + + /** Creates a new builder for the given server host, port and CronetEngine. */ + public static CronetChannelBuilder forAddress( + String host, int port, final CronetEngine cronetEngine) { + Preconditions.checkNotNull(cronetEngine, "cronetEngine"); + return new CronetChannelBuilder( + host, + port, + new StreamBuilderFactory() { + @Override + public BidirectionalStream.Builder newBidirectionalStreamBuilder( + String url, BidirectionalStream.Callback callback, Executor executor) { + return ((ExperimentalCronetEngine) cronetEngine) + .newBidirectionalStreamBuilder(url, callback, executor); + } + }); + } + + /** Creates a new builder for the given server host, port and StreamBuilderFactory. */ + public static CronetChannelBuilder forAddress( + String host, int port, StreamBuilderFactory streamFactory) { + return new CronetChannelBuilder(host, port, streamFactory); + } + + /** + * Always fails. Call {@link #forAddress(String, int, CronetEngine)} instead. + */ + public static CronetChannelBuilder forTarget(String target) { + throw new UnsupportedOperationException("call forAddress() instead"); + } + + /** + * Always fails. Call {@link #forAddress(String, int, CronetEngine)} instead. + */ + public static CronetChannelBuilder forAddress(String name, int port) { + throw new UnsupportedOperationException("call forAddress(String, int, CronetEngine) instead"); + } + + private boolean alwaysUsePut = false; + + private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE; + + private StreamBuilderFactory streamFactory; + + private CronetChannelBuilder(String host, int port, StreamBuilderFactory streamFactory) { + super( + InetSocketAddress.createUnresolved(host, port), + GrpcUtil.authorityFromHostAndPort(host, port)); + this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory"); + } + + /** + * Sets the maximum message size allowed to be received on the channel. If not called, + * defaults to {@link io.grpc.internal.GrpcUtil#DEFAULT_MAX_MESSAGE_SIZE}. + */ + public final CronetChannelBuilder maxMessageSize(int maxMessageSize) { + checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0"); + this.maxMessageSize = maxMessageSize; + return this; + } + + /** + * Sets the Cronet channel to always use PUT instead of POST. Defaults to false. + */ + public final CronetChannelBuilder alwaysUsePut(boolean enable) { + this.alwaysUsePut = enable; + return this; + } + + /** + * Not supported for building cronet channel. + */ + @Override + public final CronetChannelBuilder usePlaintext(boolean skipNegotiation) { + throw new IllegalArgumentException("Plaintext not currently supported"); + } + + @Override + protected final ClientTransportFactory buildTransportFactory() { + return new CronetTransportFactory(streamFactory, MoreExecutors.directExecutor(), + maxMessageSize, alwaysUsePut); + } + + @Override + protected Attributes getNameResolverParams() { + return Attributes.newBuilder() + .set(NameResolver.Factory.PARAMS_DEFAULT_PORT, GrpcUtil.DEFAULT_PORT_SSL).build(); + } + + @VisibleForTesting + static class CronetTransportFactory implements ClientTransportFactory { + private final ScheduledExecutorService timeoutService = + SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); + private final Executor executor; + private final int maxMessageSize; + private final boolean alwaysUsePut; + private final StreamBuilderFactory streamFactory; + + private CronetTransportFactory( + StreamBuilderFactory streamFactory, + Executor executor, + int maxMessageSize, + boolean alwaysUsePut) { + this.maxMessageSize = maxMessageSize; + this.alwaysUsePut = alwaysUsePut; + this.streamFactory = streamFactory; + this.executor = Preconditions.checkNotNull(executor, "executor"); + } + + @Override + public ConnectionClientTransport newClientTransport(SocketAddress addr, String authority, + @Nullable String userAgent) { + InetSocketAddress inetSocketAddr = (InetSocketAddress) addr; + return new CronetClientTransport(streamFactory, inetSocketAddr, authority, userAgent, + executor, maxMessageSize, alwaysUsePut); + } + + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return timeoutService; + } + + @Override + public void close() { + SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService); + } + } +} diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java new file mode 100644 index 000000000..2240772f7 --- /dev/null +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java @@ -0,0 +1,523 @@ +/* + * Copyright 2016, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.cronet; + +import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY; +import static io.grpc.internal.GrpcUtil.TE_HEADER; +import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY; + +// TODO(ericgribkoff): Consider changing from android.util.Log to java logging. +import android.util.Log; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.io.BaseEncoding; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.InternalMetadata; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory; +import io.grpc.internal.AbstractClientStream; +import io.grpc.internal.Http2ClientStreamTransportState; +import io.grpc.internal.ReadableBuffers; +import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.WritableBuffer; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.TransportFrameUtil; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executor; +import java.util.Map; +import java.util.List; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import org.chromium.net.BidirectionalStream; +import org.chromium.net.CronetException; +import org.chromium.net.ExperimentalBidirectionalStream; +import org.chromium.net.UrlResponseInfo; + +/** + * Client stream for the cronet transport. + */ +class CronetClientStream extends AbstractClientStream { + private static final int READ_BUFFER_CAPACITY = 4 * 1024; + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); + private static final String LOG_TAG = "grpc-java-cronet"; + private final String url; + private final String userAgent; + private final Executor executor; + private final Metadata headers; + private final CronetClientTransport transport; + private final Runnable startCallback; + @VisibleForTesting + final boolean idempotent; + private BidirectionalStream stream; + private final boolean delayRequestHeader; + private final Object annotation; + private final TransportState state; + private final Sink sink = new Sink(); + private StreamBuilderFactory streamFactory; + + CronetClientStream( + final String url, + @Nullable String userAgent, + Executor executor, + final Metadata headers, + CronetClientTransport transport, + Runnable startCallback, + Object lock, + int maxMessageSize, + boolean alwaysUsePut, + MethodDescriptor<?, ?> method, + StatsTraceContext statsTraceCtx, + CallOptions callOptions) { + super(new CronetWritableBufferAllocator(), statsTraceCtx, headers, method.isSafe()); + this.url = Preconditions.checkNotNull(url, "url"); + this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent"); + this.executor = Preconditions.checkNotNull(executor, "executor"); + this.headers = Preconditions.checkNotNull(headers, "headers"); + this.transport = Preconditions.checkNotNull(transport, "transport"); + this.startCallback = Preconditions.checkNotNull(startCallback, "startCallback"); + this.idempotent = method.isIdempotent() || alwaysUsePut; + // Only delay flushing header for unary rpcs. + this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY); + this.annotation = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATION_KEY); + this.state = new TransportState(maxMessageSize, statsTraceCtx, lock); + } + + @Override + protected TransportState transportState() { + return state; + } + + @Override + protected Sink abstractClientStreamSink() { + return sink; + } + + @Override + public void setAuthority(String authority) { + throw new UnsupportedOperationException("Cronet does not support overriding authority"); + } + + class Sink implements AbstractClientStream.Sink { + @Override + public void writeHeaders(Metadata metadata, byte[] payload) { + startCallback.run(); + + BidirectionalStreamCallback callback = new BidirectionalStreamCallback(); + String path = url; + if (payload != null) { + path += "?" + BaseEncoding.base64().encode(payload); + } + BidirectionalStream.Builder builder = + streamFactory.newBidirectionalStreamBuilder(path, callback, executor); + if (payload != null) { + builder.setHttpMethod("GET"); + } else if (idempotent) { + builder.setHttpMethod("PUT"); + } + if (delayRequestHeader) { + builder.delayRequestHeadersUntilFirstFlush(true); + } + if (annotation != null) { + ((ExperimentalBidirectionalStream.Builder) builder).addRequestAnnotation(annotation); + } + setGrpcHeaders(builder); + stream = builder.build(); + stream.start(); + } + + @Override + public void writeFrame(WritableBuffer buffer, boolean endOfStream, boolean flush) { + synchronized (state.lock) { + if (state.cancelSent) { + return; + } + ByteBuffer byteBuffer; + if (buffer != null) { + byteBuffer = ((CronetWritableBuffer) buffer).buffer(); + byteBuffer.flip(); + } else { + byteBuffer = EMPTY_BUFFER; + } + onSendingBytes(byteBuffer.remaining()); + if (!state.streamReady) { + state.enqueuePendingData(new PendingData(byteBuffer, endOfStream, flush)); + } else { + streamWrite(byteBuffer, endOfStream, flush); + } + } + } + + @Override + public void request(final int numMessages) { + synchronized (state.lock) { + state.requestMessagesFromDeframer(numMessages); + } + } + + @Override + public void cancel(Status reason) { + synchronized (state.lock) { + if (state.cancelSent) { + return; + } + state.cancelSent = true; + state.cancelReason = reason; + state.clearPendingData(); + if (stream != null) { + // Will report stream finish when BidirectionalStreamCallback.onCanceled is called. + stream.cancel(); + } else { + transport.finishStream(CronetClientStream.this, reason); + } + } + } + } + + class TransportState extends Http2ClientStreamTransportState { + private final Object lock; + @GuardedBy("lock") + private Queue<PendingData> pendingData = new LinkedList<PendingData>(); + @GuardedBy("lock") + private boolean streamReady; + @GuardedBy("lock") + private boolean cancelSent = false; + @GuardedBy("lock") + private int bytesPendingProcess; + @GuardedBy("lock") + private Status cancelReason; + @GuardedBy("lock") + private boolean readClosed; + + public TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock) { + super(maxMessageSize, statsTraceCtx); + this.lock = Preconditions.checkNotNull(lock, "lock"); + } + + @GuardedBy("lock") + public void start(StreamBuilderFactory factory) { + streamFactory = factory; + } + + @GuardedBy("lock") + @Override + protected void onStreamAllocated() { + super.onStreamAllocated(); + } + + @GuardedBy("lock") + @Override + protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) { + stream.cancel(); + transportReportStatus(status, stopDelivery, trailers); + } + + @GuardedBy("lock") + @Override + public void deframeFailed(Throwable cause) { + http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata()); + } + + @Override + public void runOnTransportThread(final Runnable r) { + synchronized (lock) { + r.run(); + } + } + + @GuardedBy("lock") + @Override + public void bytesRead(int processedBytes) { + bytesPendingProcess -= processedBytes; + if (bytesPendingProcess == 0 && !readClosed) { + if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { + Log.v(LOG_TAG, "BidirectionalStream.read"); + } + stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); + } + } + + @GuardedBy("lock") + private void transportHeadersReceived(Metadata metadata, boolean endOfStream) { + if (endOfStream) { + transportTrailersReceived(metadata); + } else { + transportHeadersReceived(metadata); + } + } + + @GuardedBy("lock") + private void transportDataReceived(ByteBuffer buffer, boolean endOfStream) { + bytesPendingProcess += buffer.remaining(); + super.transportDataReceived(ReadableBuffers.wrap(buffer), endOfStream); + } + + @GuardedBy("lock") + private void clearPendingData() { + for (PendingData data : pendingData) { + data.buffer.clear(); + } + pendingData.clear(); + } + + @GuardedBy("lock") + private void enqueuePendingData(PendingData data) { + pendingData.add(data); + } + + @GuardedBy("lock") + private void writeAllPendingData() { + for (PendingData data : pendingData) { + streamWrite(data.buffer, data.endOfStream, data.flush); + } + pendingData.clear(); + } + } + + // TODO(ericgribkoff): move header related method to a common place like GrpcUtil. + private static boolean isApplicationHeader(String key) { + // Don't allow reserved non HTTP/2 pseudo headers to be added + // HTTP/2 headers can not be created as keys because Header.Key disallows the ':' character. + return !CONTENT_TYPE_KEY.name().equalsIgnoreCase(key) + && !USER_AGENT_KEY.name().equalsIgnoreCase(key) + && !TE_HEADER.name().equalsIgnoreCase(key); + } + + private void setGrpcHeaders(BidirectionalStream.Builder builder) { + // Psuedo-headers are set by cronet. + // All non-pseudo headers must come after pseudo headers. + // TODO(ericgribkoff): remove this and set it on CronetEngine after crbug.com/588204 gets fixed. + builder.addHeader(USER_AGENT_KEY.name(), userAgent); + builder.addHeader(CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC); + builder.addHeader("te", GrpcUtil.TE_TRAILERS); + + // Now add any application-provided headers. + // TODO(ericgribkoff): make a String-based version to avoid unnecessary conversion between + // String and byte array. + byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers); + for (int i = 0; i < serializedHeaders.length; i += 2) { + String key = new String(serializedHeaders[i], Charset.forName("UTF-8")); + // TODO(ericgribkoff): log an error or throw an exception + if (isApplicationHeader(key)) { + String value = new String(serializedHeaders[i + 1], Charset.forName("UTF-8")); + builder.addHeader(key, value); + } + } + } + + private void streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush) { + if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { + Log.v(LOG_TAG, "BidirectionalStream.write"); + } + stream.write(buffer, endOfStream); + if (flush) { + if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { + Log.v(LOG_TAG, "BidirectionalStream.flush"); + } + stream.flush(); + } + } + + private void finishStream(Status status) { + transport.finishStream(this, status); + } + + @Override + public Attributes getAttributes() { + return Attributes.EMPTY; + } + + class BidirectionalStreamCallback extends BidirectionalStream.Callback { + private List<Map.Entry<String, String>> trailerList; + + @Override + public void onStreamReady(BidirectionalStream stream) { + if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { + Log.v(LOG_TAG, "onStreamReady"); + } + synchronized (state.lock) { + // Now that the stream is ready, call the listener's onReady callback if + // appropriate. + state.onStreamAllocated(); + state.streamReady = true; + state.writeAllPendingData(); + } + } + + @Override + public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) { + if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { + Log.v(LOG_TAG, "onResponseHeadersReceived. Header=" + info.getAllHeadersAsList()); + Log.v(LOG_TAG, "BidirectionalStream.read"); + } + reportHeaders(info.getAllHeadersAsList(), false); + stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY)); + } + + @Override + public void onReadCompleted(BidirectionalStream stream, UrlResponseInfo info, + ByteBuffer buffer, boolean endOfStream) { + buffer.flip(); + if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { + Log.v(LOG_TAG, "onReadCompleted. Size=" + buffer.remaining()); + } + + synchronized (state.lock) { + state.readClosed = endOfStream; + // The endOfStream in gRPC has a different meaning so we always call transportDataReceived + // with endOfStream=false. + if (buffer.remaining() != 0) { + state.transportDataReceived(buffer, false); + } + } + if (endOfStream && trailerList != null) { + // Process trailers if we have already received any. + reportHeaders(trailerList, true); + } + } + + @Override + public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info, + ByteBuffer buffer, boolean endOfStream) { + if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { + Log.v(LOG_TAG, "onWriteCompleted"); + } + synchronized (state.lock) { + state.onSentBytes(buffer.position()); + } + } + + @Override + public void onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info, + UrlResponseInfo.HeaderBlock trailers) { + processTrailers(trailers.getAsList()); + } + + // We need this method because UrlResponseInfo.HeaderBlock is a final class and cannot be + // mocked. + @VisibleForTesting + void processTrailers(List<Map.Entry<String, String>> trailerList) { + this.trailerList = trailerList; + boolean readClosed; + synchronized (state.lock) { + readClosed = state.readClosed; + } + if (readClosed) { + // There's no pending onReadCompleted callback so we can report trailers now. + reportHeaders(trailerList, true); + } + // Otherwise report trailers in onReadCompleted, or onSucceeded. + if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { + Log.v(LOG_TAG, "onResponseTrailersReceived. Trailer=" + trailerList.toString()); + } + } + + @Override + public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) { + if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { + Log.v(LOG_TAG, "onSucceeded"); + } + + if (!haveTrailersBeenReported()) { + if (trailerList != null) { + reportHeaders(trailerList, true); + } else if (info != null) { + reportHeaders(info.getAllHeadersAsList(), true); + } else { + throw new AssertionError("No response header or trailer"); + } + } + finishStream(toGrpcStatus(info)); + } + + @Override + public void onFailed(BidirectionalStream stream, UrlResponseInfo info, + CronetException error) { + if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { + Log.v(LOG_TAG, "onFailed"); + } + finishStream(Status.UNAVAILABLE.withCause(error)); + } + + @Override + public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) { + if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) { + Log.v(LOG_TAG, "onCanceled"); + } + Status status; + synchronized (state.lock) { + if (state.cancelReason != null) { + status = state.cancelReason; + } else if (info != null) { + status = toGrpcStatus(info); + } else { + status = Status.CANCELLED; + } + } + finishStream(status); + } + + private void reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream) { + // TODO(ericgribkoff): create new utility methods to eliminate all these conversions + List<String> headerList = new ArrayList<String>(); + for (Map.Entry<String, String> entry : headers) { + headerList.add(entry.getKey()); + headerList.add(entry.getValue()); + } + + byte[][] headerValues = new byte[headerList.size()][]; + for (int i = 0; i < headerList.size(); i += 2) { + headerValues[i] = headerList.get(i).getBytes(Charset.forName("UTF-8")); + headerValues[i + 1] = headerList.get(i + 1).getBytes(Charset.forName("UTF-8")); + } + Metadata metadata = + InternalMetadata.newMetadata(TransportFrameUtil.toRawSerializedHeaders(headerValues)); + synchronized (state.lock) { + // There's no pending onReadCompleted callback so we can report trailers now. + state.transportHeadersReceived(metadata, endOfStream); + } + } + + private boolean haveTrailersBeenReported() { + synchronized (state.lock) { + return trailerList != null && state.readClosed; + } + } + + private Status toGrpcStatus(UrlResponseInfo info) { + return GrpcUtil.httpStatusToGrpcStatus(info.getHttpStatusCode()); + } + } + + private static class PendingData { + ByteBuffer buffer; + boolean endOfStream; + boolean flush; + + PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush) { + this.buffer = buffer; + this.endOfStream = endOfStream; + this.flush = flush; + } + } +} diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java new file mode 100644 index 000000000..65d7b304e --- /dev/null +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java @@ -0,0 +1,245 @@ +/* + * Copyright 2016, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.cronet; + +import com.google.common.base.Preconditions; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory; +import io.grpc.internal.ConnectionClientTransport; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.LogId; +import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.WithLogId; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Executor; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +/** + * A cronet-based {@link ConnectionClientTransport} implementation. + */ +class CronetClientTransport implements ConnectionClientTransport, WithLogId { + private final LogId logId = LogId.allocate(getClass().getName()); + private final InetSocketAddress address; + private final String authority; + private final String userAgent; + private Listener listener; + private final Object lock = new Object(); + @GuardedBy("lock") + private final Set<CronetClientStream> streams = + new HashSet<CronetClientStream>(); + private final Executor executor; + private final int maxMessageSize; + private final boolean alwaysUsePut; + // Indicates the transport is in go-away state: no new streams will be processed, + // but existing streams may continue. + @GuardedBy("lock") + private boolean goAway; + // Used to indicate the special phase while we are going to enter go-away state but before + // goAway is turned to true, see the comment at where this is set about why it is needed. + @GuardedBy("lock") + private boolean startedGoAway; + @GuardedBy("lock") + private Status goAwayStatus; + @GuardedBy("lock") + private boolean stopped; + @GuardedBy("lock") + // Whether this transport has started. + private boolean started; + private StreamBuilderFactory streamFactory; + + CronetClientTransport( + StreamBuilderFactory streamFactory, + InetSocketAddress address, + String authority, + @Nullable String userAgent, + Executor executor, + int maxMessageSize, + boolean alwaysUsePut) { + this.address = Preconditions.checkNotNull(address, "address"); + this.authority = authority; + this.userAgent = GrpcUtil.getGrpcUserAgent("cronet", userAgent); + this.maxMessageSize = maxMessageSize; + this.alwaysUsePut = alwaysUsePut; + this.executor = Preconditions.checkNotNull(executor, "executor"); + this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory"); + } + + @Override + public CronetClientStream newStream(final MethodDescriptor<?, ?> method, final Metadata headers, + final CallOptions callOptions) { + Preconditions.checkNotNull(method, "method"); + Preconditions.checkNotNull(headers, "headers"); + + final String defaultPath = "/" + method.getFullMethodName(); + final String url = "https://" + authority + defaultPath; + + final StatsTraceContext statsTraceCtx = + StatsTraceContext.newClientContext(callOptions, headers); + class StartCallback implements Runnable { + final CronetClientStream clientStream = new CronetClientStream( + url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize, + alwaysUsePut, method, statsTraceCtx, callOptions); + + @Override + public void run() { + synchronized (lock) { + if (goAway) { + clientStream.transportState().transportReportStatus(goAwayStatus, true, new Metadata()); + } else if (started) { + startStream(clientStream); + } else { + throw new AssertionError("Transport is not started"); + } + } + } + } + + return new StartCallback().clientStream; + } + + @GuardedBy("lock") + private void startStream(CronetClientStream stream) { + streams.add(stream); + stream.transportState().start(streamFactory); + } + + @Override + public Runnable start(Listener listener) { + this.listener = Preconditions.checkNotNull(listener, "listener"); + synchronized (lock) { + started = true; + } + return new Runnable() { + @Override + public void run() { + // Listener callbacks should not be called simultaneously + CronetClientTransport.this.listener.transportReady(); + } + }; + } + + @Override + public String toString() { + return super.toString() + "(" + address + ")"; + } + + public void shutdown() { + shutdown(Status.UNAVAILABLE.withDescription("Transport stopped")); + } + + @Override + public void shutdown(Status status) { + synchronized (lock) { + if (goAway) { + return; + } + } + + startGoAway(status); + } + + @Override + public void shutdownNow(Status status) { + shutdown(status); + ArrayList<CronetClientStream> streamsCopy; + synchronized (lock) { + // A copy is always necessary since cancel() can call finishStream() which calls + // streams.remove() + streamsCopy = new ArrayList<CronetClientStream>(streams); + } + for (int i = 0; i < streamsCopy.size(); i++) { + // Avoid deadlock by calling into stream without lock held + streamsCopy.get(i).cancel(status); + } + stopIfNecessary(); + } + + @Override + public Attributes getAttributes() { + // TODO(zhangkun83): fill channel security attributes + return Attributes.EMPTY; + } + + private void startGoAway(Status status) { + synchronized (lock) { + if (startedGoAway) { + // Another go-away is in progress, ignore this one. + return; + } + // We use startedGoAway here instead of goAway, because once the goAway becomes true, other + // thread in stopIfNecessary() may stop the transport and cause the + // listener.transportTerminated() be called before listener.transportShutdown(). + startedGoAway = true; + } + + listener.transportShutdown(status); + + synchronized (lock) { + goAway = true; + goAwayStatus = status; + } + + stopIfNecessary(); + } + + @Override + public void ping(final PingCallback callback, Executor executor) { + // TODO(ericgribkoff): depend on cronet implemenetation + throw new UnsupportedOperationException(); + } + + @Override + public LogId getLogId() { + return logId; + } + + /** + * When the transport is in goAway state, we should stop it once all active streams finish. + */ + void stopIfNecessary() { + synchronized (lock) { + if (goAway && !stopped && streams.size() == 0) { + stopped = true; + } else { + return; + } + } + listener.transportTerminated(); + } + + void finishStream(CronetClientStream stream, Status status) { + synchronized (lock) { + if (streams.remove(stream)) { + boolean isCancelled = (status.getCode() == Code.CANCELLED + || status.getCode() == Code.DEADLINE_EXCEEDED); + stream.transportState().transportReportStatus(status, isCancelled, new Metadata()); + } else { + return; + } + } + stopIfNecessary(); + } +} diff --git a/cronet/src/main/java/io/grpc/cronet/CronetWritableBuffer.java b/cronet/src/main/java/io/grpc/cronet/CronetWritableBuffer.java new file mode 100644 index 000000000..f0e713a2b --- /dev/null +++ b/cronet/src/main/java/io/grpc/cronet/CronetWritableBuffer.java @@ -0,0 +1,57 @@ +/* + * Copyright 2016, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.cronet; + +import com.google.common.base.Preconditions; +import io.grpc.internal.WritableBuffer; +import java.nio.ByteBuffer; + +class CronetWritableBuffer implements WritableBuffer { + private final ByteBuffer buffer; + + public CronetWritableBuffer(ByteBuffer buffer, int capacity) { + this.buffer = Preconditions.checkNotNull(buffer, "buffer"); + } + + @Override + public void write(byte[] src, int srcIndex, int length) { + buffer.put(src, srcIndex, length); + } + + @Override + public void write(byte b) { + buffer.put(b); + } + + @Override + public int writableBytes() { + return buffer.remaining(); + } + + @Override + public int readableBytes() { + return buffer.position(); + } + + @Override + public void release() { + } + + ByteBuffer buffer() { + return buffer; + } +} diff --git a/cronet/src/main/java/io/grpc/cronet/CronetWritableBufferAllocator.java b/cronet/src/main/java/io/grpc/cronet/CronetWritableBufferAllocator.java new file mode 100644 index 000000000..c373b03eb --- /dev/null +++ b/cronet/src/main/java/io/grpc/cronet/CronetWritableBufferAllocator.java @@ -0,0 +1,41 @@ +/* + * Copyright 2016, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.cronet; + +import io.grpc.internal.WritableBuffer; +import io.grpc.internal.WritableBufferAllocator; +import java.nio.ByteBuffer; + +/** + * The default allocator for {@link CronetWritableBuffer}s used by the Cronet transport. + */ +class CronetWritableBufferAllocator implements WritableBufferAllocator { + // Set the maximum buffer size to 1MB + private static final int MAX_BUFFER = 1024 * 1024; + + /** + * Construct a new instance. + */ + CronetWritableBufferAllocator() { + } + + @Override + public WritableBuffer allocate(int capacityHint) { + capacityHint = Math.min(MAX_BUFFER, capacityHint); + return new CronetWritableBuffer(ByteBuffer.allocateDirect(capacityHint), capacityHint); + } +} |