aboutsummaryrefslogtreecommitdiff
path: root/cronet
diff options
context:
space:
mode:
authorEric Gribkoff <ericgribkoff@google.com>2017-10-06 11:18:03 -0700
committerGitHub <noreply@github.com>2017-10-06 11:18:03 -0700
commit4d67c3d63fa93e14f9c6fe2e05a156e9471006e8 (patch)
tree44c2d7b3b62048c447cebc22684ee43454190c01 /cronet
parentb07c70a09f2e45e578d315d7aa9f0f9fedda6d1a (diff)
downloadgrpc-grpc-java-4d67c3d63fa93e14f9c6fe2e05a156e9471006e8.tar.gz
cronet: open-source experimental Cronet transport code
Diffstat (limited to 'cronet')
-rw-r--r--cronet/README.md46
-rw-r--r--cronet/src/main/java/io/grpc/cronet/CronetCallOptions.java34
-rw-r--r--cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java179
-rw-r--r--cronet/src/main/java/io/grpc/cronet/CronetClientStream.java523
-rw-r--r--cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java245
-rw-r--r--cronet/src/main/java/io/grpc/cronet/CronetWritableBuffer.java57
-rw-r--r--cronet/src/main/java/io/grpc/cronet/CronetWritableBufferAllocator.java41
7 files changed, 1125 insertions, 0 deletions
diff --git a/cronet/README.md b/cronet/README.md
new file mode 100644
index 000000000..1103f31f6
--- /dev/null
+++ b/cronet/README.md
@@ -0,0 +1,46 @@
+gRPC Cronet Transport
+========================
+
+**EXPERIMENTAL:** *gRPC's Cronet transport is an experimental API, and is not
+yet integrated with our build system. Using Cronet with gRPC requires manually
+integrating the Cronet libraries and the gRPC code in this directory into your
+Android application.*
+
+This code enables using the [Chromium networking stack
+(Cronet)](https://chromium.googlesource.com/chromium/src/+/master/components/cronet)
+as the transport layer for gRPC on Android. This lets your Android app make
+RPCs using the same networking stack as used in the Chrome browser.
+
+Some advantages of using Cronet with gRPC:
+* Bundles an OpenSSL implementation, enabling TLS connections even on older
+ versions of Android without additional configuration
+* Robust to Android network connectivity changes
+* Support for [QUIC](https://www.chromium.org/quic)
+
+Cronet jars are not currently available on Maven. The instructions at
+https://github.com/GoogleChrome/cronet-sample/blob/master/README.md describe
+how to manually download the Cronet binaries and add them to your Android
+application. You will also need to copy the gRPC source files contained in this
+directory into your application's code, as we do not currently provide a
+`grpc-cronet` dependency.
+
+To use Cronet, you must have the `ACCESS_NETWORK_STATE` permission set in
+`AndroidManifest.xml`:
+
+```
+<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
+```
+
+Once the above steps are completed, you can create a gRPC Cronet channel as
+follows:
+
+```
+import io.grpc.cronet.CronetChannelBuilder;
+import org.chromium.net.ExperimentalCronetEngine;
+
+...
+
+ExperimentalCronetEngine engine =
+ new ExperimentalCronetEngine.Builder(context /* Android Context */).build();
+ManagedChannel channel = CronetChannelBuilder.forAddress("localhost", 8080, engine).build();
+```
diff --git a/cronet/src/main/java/io/grpc/cronet/CronetCallOptions.java b/cronet/src/main/java/io/grpc/cronet/CronetCallOptions.java
new file mode 100644
index 000000000..a3d43f52f
--- /dev/null
+++ b/cronet/src/main/java/io/grpc/cronet/CronetCallOptions.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2017, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.cronet;
+
+import io.grpc.CallOptions;
+
+/** Call options for use with the Cronet transport. */
+public final class CronetCallOptions {
+ private CronetCallOptions() {}
+
+ /**
+ * Used for attaching annotation objects to Cronet streams. When the stream finishes, the user can
+ * get Cronet metrics from {@link org.chromium.net.RequestFinishedInfo.Listener} with the same
+ * annotation object.
+ *
+ * The Object must not be null.
+ */
+ public static final CallOptions.Key<Object> CRONET_ANNOTATION_KEY =
+ CallOptions.Key.of("cronet-annotation", null);
+}
diff --git a/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java b/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java
new file mode 100644
index 000000000..13e7d4a83
--- /dev/null
+++ b/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2016, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.cronet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.grpc.Attributes;
+import io.grpc.ExperimentalApi;
+import io.grpc.NameResolver;
+import io.grpc.internal.AbstractManagedChannelImplBuilder;
+import io.grpc.internal.ClientTransportFactory;
+import io.grpc.internal.ConnectionClientTransport;
+import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.SharedResourceHolder;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import javax.annotation.Nullable;
+import org.chromium.net.BidirectionalStream;
+import org.chromium.net.CronetEngine;
+import org.chromium.net.ExperimentalCronetEngine;
+
+/** Convenience class for building channels with the cronet transport. */
+@ExperimentalApi("There is no plan to make this API stable, given transport API instability")
+public class CronetChannelBuilder extends
+ AbstractManagedChannelImplBuilder<CronetChannelBuilder> {
+
+ /** BidirectionalStream.Builder factory used for getting the gRPC BidirectionalStream. */
+ public static abstract class StreamBuilderFactory {
+ public abstract BidirectionalStream.Builder newBidirectionalStreamBuilder(
+ String url, BidirectionalStream.Callback callback, Executor executor);
+ }
+
+ /** Creates a new builder for the given server host, port and CronetEngine. */
+ public static CronetChannelBuilder forAddress(
+ String host, int port, final CronetEngine cronetEngine) {
+ Preconditions.checkNotNull(cronetEngine, "cronetEngine");
+ return new CronetChannelBuilder(
+ host,
+ port,
+ new StreamBuilderFactory() {
+ @Override
+ public BidirectionalStream.Builder newBidirectionalStreamBuilder(
+ String url, BidirectionalStream.Callback callback, Executor executor) {
+ return ((ExperimentalCronetEngine) cronetEngine)
+ .newBidirectionalStreamBuilder(url, callback, executor);
+ }
+ });
+ }
+
+ /** Creates a new builder for the given server host, port and StreamBuilderFactory. */
+ public static CronetChannelBuilder forAddress(
+ String host, int port, StreamBuilderFactory streamFactory) {
+ return new CronetChannelBuilder(host, port, streamFactory);
+ }
+
+ /**
+ * Always fails. Call {@link #forAddress(String, int, CronetEngine)} instead.
+ */
+ public static CronetChannelBuilder forTarget(String target) {
+ throw new UnsupportedOperationException("call forAddress() instead");
+ }
+
+ /**
+ * Always fails. Call {@link #forAddress(String, int, CronetEngine)} instead.
+ */
+ public static CronetChannelBuilder forAddress(String name, int port) {
+ throw new UnsupportedOperationException("call forAddress(String, int, CronetEngine) instead");
+ }
+
+ private boolean alwaysUsePut = false;
+
+ private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
+
+ private StreamBuilderFactory streamFactory;
+
+ private CronetChannelBuilder(String host, int port, StreamBuilderFactory streamFactory) {
+ super(
+ InetSocketAddress.createUnresolved(host, port),
+ GrpcUtil.authorityFromHostAndPort(host, port));
+ this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory");
+ }
+
+ /**
+ * Sets the maximum message size allowed to be received on the channel. If not called,
+ * defaults to {@link io.grpc.internal.GrpcUtil#DEFAULT_MAX_MESSAGE_SIZE}.
+ */
+ public final CronetChannelBuilder maxMessageSize(int maxMessageSize) {
+ checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0");
+ this.maxMessageSize = maxMessageSize;
+ return this;
+ }
+
+ /**
+ * Sets the Cronet channel to always use PUT instead of POST. Defaults to false.
+ */
+ public final CronetChannelBuilder alwaysUsePut(boolean enable) {
+ this.alwaysUsePut = enable;
+ return this;
+ }
+
+ /**
+ * Not supported for building cronet channel.
+ */
+ @Override
+ public final CronetChannelBuilder usePlaintext(boolean skipNegotiation) {
+ throw new IllegalArgumentException("Plaintext not currently supported");
+ }
+
+ @Override
+ protected final ClientTransportFactory buildTransportFactory() {
+ return new CronetTransportFactory(streamFactory, MoreExecutors.directExecutor(),
+ maxMessageSize, alwaysUsePut);
+ }
+
+ @Override
+ protected Attributes getNameResolverParams() {
+ return Attributes.newBuilder()
+ .set(NameResolver.Factory.PARAMS_DEFAULT_PORT, GrpcUtil.DEFAULT_PORT_SSL).build();
+ }
+
+ @VisibleForTesting
+ static class CronetTransportFactory implements ClientTransportFactory {
+ private final ScheduledExecutorService timeoutService =
+ SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
+ private final Executor executor;
+ private final int maxMessageSize;
+ private final boolean alwaysUsePut;
+ private final StreamBuilderFactory streamFactory;
+
+ private CronetTransportFactory(
+ StreamBuilderFactory streamFactory,
+ Executor executor,
+ int maxMessageSize,
+ boolean alwaysUsePut) {
+ this.maxMessageSize = maxMessageSize;
+ this.alwaysUsePut = alwaysUsePut;
+ this.streamFactory = streamFactory;
+ this.executor = Preconditions.checkNotNull(executor, "executor");
+ }
+
+ @Override
+ public ConnectionClientTransport newClientTransport(SocketAddress addr, String authority,
+ @Nullable String userAgent) {
+ InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
+ return new CronetClientTransport(streamFactory, inetSocketAddr, authority, userAgent,
+ executor, maxMessageSize, alwaysUsePut);
+ }
+
+ @Override
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return timeoutService;
+ }
+
+ @Override
+ public void close() {
+ SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
+ }
+ }
+}
diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java
new file mode 100644
index 000000000..2240772f7
--- /dev/null
+++ b/cronet/src/main/java/io/grpc/cronet/CronetClientStream.java
@@ -0,0 +1,523 @@
+/*
+ * Copyright 2016, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.cronet;
+
+import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
+import static io.grpc.internal.GrpcUtil.TE_HEADER;
+import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
+
+// TODO(ericgribkoff): Consider changing from android.util.Log to java logging.
+import android.util.Log;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.BaseEncoding;
+import io.grpc.Attributes;
+import io.grpc.CallOptions;
+import io.grpc.InternalMetadata;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory;
+import io.grpc.internal.AbstractClientStream;
+import io.grpc.internal.Http2ClientStreamTransportState;
+import io.grpc.internal.ReadableBuffers;
+import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.WritableBuffer;
+import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.TransportFrameUtil;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import java.util.Map;
+import java.util.List;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import org.chromium.net.BidirectionalStream;
+import org.chromium.net.CronetException;
+import org.chromium.net.ExperimentalBidirectionalStream;
+import org.chromium.net.UrlResponseInfo;
+
+/**
+ * Client stream for the cronet transport.
+ */
+class CronetClientStream extends AbstractClientStream {
+ private static final int READ_BUFFER_CAPACITY = 4 * 1024;
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
+ private static final String LOG_TAG = "grpc-java-cronet";
+ private final String url;
+ private final String userAgent;
+ private final Executor executor;
+ private final Metadata headers;
+ private final CronetClientTransport transport;
+ private final Runnable startCallback;
+ @VisibleForTesting
+ final boolean idempotent;
+ private BidirectionalStream stream;
+ private final boolean delayRequestHeader;
+ private final Object annotation;
+ private final TransportState state;
+ private final Sink sink = new Sink();
+ private StreamBuilderFactory streamFactory;
+
+ CronetClientStream(
+ final String url,
+ @Nullable String userAgent,
+ Executor executor,
+ final Metadata headers,
+ CronetClientTransport transport,
+ Runnable startCallback,
+ Object lock,
+ int maxMessageSize,
+ boolean alwaysUsePut,
+ MethodDescriptor<?, ?> method,
+ StatsTraceContext statsTraceCtx,
+ CallOptions callOptions) {
+ super(new CronetWritableBufferAllocator(), statsTraceCtx, headers, method.isSafe());
+ this.url = Preconditions.checkNotNull(url, "url");
+ this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent");
+ this.executor = Preconditions.checkNotNull(executor, "executor");
+ this.headers = Preconditions.checkNotNull(headers, "headers");
+ this.transport = Preconditions.checkNotNull(transport, "transport");
+ this.startCallback = Preconditions.checkNotNull(startCallback, "startCallback");
+ this.idempotent = method.isIdempotent() || alwaysUsePut;
+ // Only delay flushing header for unary rpcs.
+ this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY);
+ this.annotation = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATION_KEY);
+ this.state = new TransportState(maxMessageSize, statsTraceCtx, lock);
+ }
+
+ @Override
+ protected TransportState transportState() {
+ return state;
+ }
+
+ @Override
+ protected Sink abstractClientStreamSink() {
+ return sink;
+ }
+
+ @Override
+ public void setAuthority(String authority) {
+ throw new UnsupportedOperationException("Cronet does not support overriding authority");
+ }
+
+ class Sink implements AbstractClientStream.Sink {
+ @Override
+ public void writeHeaders(Metadata metadata, byte[] payload) {
+ startCallback.run();
+
+ BidirectionalStreamCallback callback = new BidirectionalStreamCallback();
+ String path = url;
+ if (payload != null) {
+ path += "?" + BaseEncoding.base64().encode(payload);
+ }
+ BidirectionalStream.Builder builder =
+ streamFactory.newBidirectionalStreamBuilder(path, callback, executor);
+ if (payload != null) {
+ builder.setHttpMethod("GET");
+ } else if (idempotent) {
+ builder.setHttpMethod("PUT");
+ }
+ if (delayRequestHeader) {
+ builder.delayRequestHeadersUntilFirstFlush(true);
+ }
+ if (annotation != null) {
+ ((ExperimentalBidirectionalStream.Builder) builder).addRequestAnnotation(annotation);
+ }
+ setGrpcHeaders(builder);
+ stream = builder.build();
+ stream.start();
+ }
+
+ @Override
+ public void writeFrame(WritableBuffer buffer, boolean endOfStream, boolean flush) {
+ synchronized (state.lock) {
+ if (state.cancelSent) {
+ return;
+ }
+ ByteBuffer byteBuffer;
+ if (buffer != null) {
+ byteBuffer = ((CronetWritableBuffer) buffer).buffer();
+ byteBuffer.flip();
+ } else {
+ byteBuffer = EMPTY_BUFFER;
+ }
+ onSendingBytes(byteBuffer.remaining());
+ if (!state.streamReady) {
+ state.enqueuePendingData(new PendingData(byteBuffer, endOfStream, flush));
+ } else {
+ streamWrite(byteBuffer, endOfStream, flush);
+ }
+ }
+ }
+
+ @Override
+ public void request(final int numMessages) {
+ synchronized (state.lock) {
+ state.requestMessagesFromDeframer(numMessages);
+ }
+ }
+
+ @Override
+ public void cancel(Status reason) {
+ synchronized (state.lock) {
+ if (state.cancelSent) {
+ return;
+ }
+ state.cancelSent = true;
+ state.cancelReason = reason;
+ state.clearPendingData();
+ if (stream != null) {
+ // Will report stream finish when BidirectionalStreamCallback.onCanceled is called.
+ stream.cancel();
+ } else {
+ transport.finishStream(CronetClientStream.this, reason);
+ }
+ }
+ }
+ }
+
+ class TransportState extends Http2ClientStreamTransportState {
+ private final Object lock;
+ @GuardedBy("lock")
+ private Queue<PendingData> pendingData = new LinkedList<PendingData>();
+ @GuardedBy("lock")
+ private boolean streamReady;
+ @GuardedBy("lock")
+ private boolean cancelSent = false;
+ @GuardedBy("lock")
+ private int bytesPendingProcess;
+ @GuardedBy("lock")
+ private Status cancelReason;
+ @GuardedBy("lock")
+ private boolean readClosed;
+
+ public TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock) {
+ super(maxMessageSize, statsTraceCtx);
+ this.lock = Preconditions.checkNotNull(lock, "lock");
+ }
+
+ @GuardedBy("lock")
+ public void start(StreamBuilderFactory factory) {
+ streamFactory = factory;
+ }
+
+ @GuardedBy("lock")
+ @Override
+ protected void onStreamAllocated() {
+ super.onStreamAllocated();
+ }
+
+ @GuardedBy("lock")
+ @Override
+ protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) {
+ stream.cancel();
+ transportReportStatus(status, stopDelivery, trailers);
+ }
+
+ @GuardedBy("lock")
+ @Override
+ public void deframeFailed(Throwable cause) {
+ http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata());
+ }
+
+ @Override
+ public void runOnTransportThread(final Runnable r) {
+ synchronized (lock) {
+ r.run();
+ }
+ }
+
+ @GuardedBy("lock")
+ @Override
+ public void bytesRead(int processedBytes) {
+ bytesPendingProcess -= processedBytes;
+ if (bytesPendingProcess == 0 && !readClosed) {
+ if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
+ Log.v(LOG_TAG, "BidirectionalStream.read");
+ }
+ stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
+ }
+ }
+
+ @GuardedBy("lock")
+ private void transportHeadersReceived(Metadata metadata, boolean endOfStream) {
+ if (endOfStream) {
+ transportTrailersReceived(metadata);
+ } else {
+ transportHeadersReceived(metadata);
+ }
+ }
+
+ @GuardedBy("lock")
+ private void transportDataReceived(ByteBuffer buffer, boolean endOfStream) {
+ bytesPendingProcess += buffer.remaining();
+ super.transportDataReceived(ReadableBuffers.wrap(buffer), endOfStream);
+ }
+
+ @GuardedBy("lock")
+ private void clearPendingData() {
+ for (PendingData data : pendingData) {
+ data.buffer.clear();
+ }
+ pendingData.clear();
+ }
+
+ @GuardedBy("lock")
+ private void enqueuePendingData(PendingData data) {
+ pendingData.add(data);
+ }
+
+ @GuardedBy("lock")
+ private void writeAllPendingData() {
+ for (PendingData data : pendingData) {
+ streamWrite(data.buffer, data.endOfStream, data.flush);
+ }
+ pendingData.clear();
+ }
+ }
+
+ // TODO(ericgribkoff): move header related method to a common place like GrpcUtil.
+ private static boolean isApplicationHeader(String key) {
+ // Don't allow reserved non HTTP/2 pseudo headers to be added
+ // HTTP/2 headers can not be created as keys because Header.Key disallows the ':' character.
+ return !CONTENT_TYPE_KEY.name().equalsIgnoreCase(key)
+ && !USER_AGENT_KEY.name().equalsIgnoreCase(key)
+ && !TE_HEADER.name().equalsIgnoreCase(key);
+ }
+
+ private void setGrpcHeaders(BidirectionalStream.Builder builder) {
+ // Psuedo-headers are set by cronet.
+ // All non-pseudo headers must come after pseudo headers.
+ // TODO(ericgribkoff): remove this and set it on CronetEngine after crbug.com/588204 gets fixed.
+ builder.addHeader(USER_AGENT_KEY.name(), userAgent);
+ builder.addHeader(CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC);
+ builder.addHeader("te", GrpcUtil.TE_TRAILERS);
+
+ // Now add any application-provided headers.
+ // TODO(ericgribkoff): make a String-based version to avoid unnecessary conversion between
+ // String and byte array.
+ byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers);
+ for (int i = 0; i < serializedHeaders.length; i += 2) {
+ String key = new String(serializedHeaders[i], Charset.forName("UTF-8"));
+ // TODO(ericgribkoff): log an error or throw an exception
+ if (isApplicationHeader(key)) {
+ String value = new String(serializedHeaders[i + 1], Charset.forName("UTF-8"));
+ builder.addHeader(key, value);
+ }
+ }
+ }
+
+ private void streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush) {
+ if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
+ Log.v(LOG_TAG, "BidirectionalStream.write");
+ }
+ stream.write(buffer, endOfStream);
+ if (flush) {
+ if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
+ Log.v(LOG_TAG, "BidirectionalStream.flush");
+ }
+ stream.flush();
+ }
+ }
+
+ private void finishStream(Status status) {
+ transport.finishStream(this, status);
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ return Attributes.EMPTY;
+ }
+
+ class BidirectionalStreamCallback extends BidirectionalStream.Callback {
+ private List<Map.Entry<String, String>> trailerList;
+
+ @Override
+ public void onStreamReady(BidirectionalStream stream) {
+ if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
+ Log.v(LOG_TAG, "onStreamReady");
+ }
+ synchronized (state.lock) {
+ // Now that the stream is ready, call the listener's onReady callback if
+ // appropriate.
+ state.onStreamAllocated();
+ state.streamReady = true;
+ state.writeAllPendingData();
+ }
+ }
+
+ @Override
+ public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) {
+ if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
+ Log.v(LOG_TAG, "onResponseHeadersReceived. Header=" + info.getAllHeadersAsList());
+ Log.v(LOG_TAG, "BidirectionalStream.read");
+ }
+ reportHeaders(info.getAllHeadersAsList(), false);
+ stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
+ }
+
+ @Override
+ public void onReadCompleted(BidirectionalStream stream, UrlResponseInfo info,
+ ByteBuffer buffer, boolean endOfStream) {
+ buffer.flip();
+ if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
+ Log.v(LOG_TAG, "onReadCompleted. Size=" + buffer.remaining());
+ }
+
+ synchronized (state.lock) {
+ state.readClosed = endOfStream;
+ // The endOfStream in gRPC has a different meaning so we always call transportDataReceived
+ // with endOfStream=false.
+ if (buffer.remaining() != 0) {
+ state.transportDataReceived(buffer, false);
+ }
+ }
+ if (endOfStream && trailerList != null) {
+ // Process trailers if we have already received any.
+ reportHeaders(trailerList, true);
+ }
+ }
+
+ @Override
+ public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info,
+ ByteBuffer buffer, boolean endOfStream) {
+ if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
+ Log.v(LOG_TAG, "onWriteCompleted");
+ }
+ synchronized (state.lock) {
+ state.onSentBytes(buffer.position());
+ }
+ }
+
+ @Override
+ public void onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info,
+ UrlResponseInfo.HeaderBlock trailers) {
+ processTrailers(trailers.getAsList());
+ }
+
+ // We need this method because UrlResponseInfo.HeaderBlock is a final class and cannot be
+ // mocked.
+ @VisibleForTesting
+ void processTrailers(List<Map.Entry<String, String>> trailerList) {
+ this.trailerList = trailerList;
+ boolean readClosed;
+ synchronized (state.lock) {
+ readClosed = state.readClosed;
+ }
+ if (readClosed) {
+ // There's no pending onReadCompleted callback so we can report trailers now.
+ reportHeaders(trailerList, true);
+ }
+ // Otherwise report trailers in onReadCompleted, or onSucceeded.
+ if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
+ Log.v(LOG_TAG, "onResponseTrailersReceived. Trailer=" + trailerList.toString());
+ }
+ }
+
+ @Override
+ public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
+ if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
+ Log.v(LOG_TAG, "onSucceeded");
+ }
+
+ if (!haveTrailersBeenReported()) {
+ if (trailerList != null) {
+ reportHeaders(trailerList, true);
+ } else if (info != null) {
+ reportHeaders(info.getAllHeadersAsList(), true);
+ } else {
+ throw new AssertionError("No response header or trailer");
+ }
+ }
+ finishStream(toGrpcStatus(info));
+ }
+
+ @Override
+ public void onFailed(BidirectionalStream stream, UrlResponseInfo info,
+ CronetException error) {
+ if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
+ Log.v(LOG_TAG, "onFailed");
+ }
+ finishStream(Status.UNAVAILABLE.withCause(error));
+ }
+
+ @Override
+ public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
+ if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
+ Log.v(LOG_TAG, "onCanceled");
+ }
+ Status status;
+ synchronized (state.lock) {
+ if (state.cancelReason != null) {
+ status = state.cancelReason;
+ } else if (info != null) {
+ status = toGrpcStatus(info);
+ } else {
+ status = Status.CANCELLED;
+ }
+ }
+ finishStream(status);
+ }
+
+ private void reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream) {
+ // TODO(ericgribkoff): create new utility methods to eliminate all these conversions
+ List<String> headerList = new ArrayList<String>();
+ for (Map.Entry<String, String> entry : headers) {
+ headerList.add(entry.getKey());
+ headerList.add(entry.getValue());
+ }
+
+ byte[][] headerValues = new byte[headerList.size()][];
+ for (int i = 0; i < headerList.size(); i += 2) {
+ headerValues[i] = headerList.get(i).getBytes(Charset.forName("UTF-8"));
+ headerValues[i + 1] = headerList.get(i + 1).getBytes(Charset.forName("UTF-8"));
+ }
+ Metadata metadata =
+ InternalMetadata.newMetadata(TransportFrameUtil.toRawSerializedHeaders(headerValues));
+ synchronized (state.lock) {
+ // There's no pending onReadCompleted callback so we can report trailers now.
+ state.transportHeadersReceived(metadata, endOfStream);
+ }
+ }
+
+ private boolean haveTrailersBeenReported() {
+ synchronized (state.lock) {
+ return trailerList != null && state.readClosed;
+ }
+ }
+
+ private Status toGrpcStatus(UrlResponseInfo info) {
+ return GrpcUtil.httpStatusToGrpcStatus(info.getHttpStatusCode());
+ }
+ }
+
+ private static class PendingData {
+ ByteBuffer buffer;
+ boolean endOfStream;
+ boolean flush;
+
+ PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush) {
+ this.buffer = buffer;
+ this.endOfStream = endOfStream;
+ this.flush = flush;
+ }
+ }
+}
diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java
new file mode 100644
index 000000000..65d7b304e
--- /dev/null
+++ b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2016, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.cronet;
+
+import com.google.common.base.Preconditions;
+import io.grpc.Attributes;
+import io.grpc.CallOptions;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory;
+import io.grpc.internal.ConnectionClientTransport;
+import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.LogId;
+import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.WithLogId;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * A cronet-based {@link ConnectionClientTransport} implementation.
+ */
+class CronetClientTransport implements ConnectionClientTransport, WithLogId {
+ private final LogId logId = LogId.allocate(getClass().getName());
+ private final InetSocketAddress address;
+ private final String authority;
+ private final String userAgent;
+ private Listener listener;
+ private final Object lock = new Object();
+ @GuardedBy("lock")
+ private final Set<CronetClientStream> streams =
+ new HashSet<CronetClientStream>();
+ private final Executor executor;
+ private final int maxMessageSize;
+ private final boolean alwaysUsePut;
+ // Indicates the transport is in go-away state: no new streams will be processed,
+ // but existing streams may continue.
+ @GuardedBy("lock")
+ private boolean goAway;
+ // Used to indicate the special phase while we are going to enter go-away state but before
+ // goAway is turned to true, see the comment at where this is set about why it is needed.
+ @GuardedBy("lock")
+ private boolean startedGoAway;
+ @GuardedBy("lock")
+ private Status goAwayStatus;
+ @GuardedBy("lock")
+ private boolean stopped;
+ @GuardedBy("lock")
+ // Whether this transport has started.
+ private boolean started;
+ private StreamBuilderFactory streamFactory;
+
+ CronetClientTransport(
+ StreamBuilderFactory streamFactory,
+ InetSocketAddress address,
+ String authority,
+ @Nullable String userAgent,
+ Executor executor,
+ int maxMessageSize,
+ boolean alwaysUsePut) {
+ this.address = Preconditions.checkNotNull(address, "address");
+ this.authority = authority;
+ this.userAgent = GrpcUtil.getGrpcUserAgent("cronet", userAgent);
+ this.maxMessageSize = maxMessageSize;
+ this.alwaysUsePut = alwaysUsePut;
+ this.executor = Preconditions.checkNotNull(executor, "executor");
+ this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory");
+ }
+
+ @Override
+ public CronetClientStream newStream(final MethodDescriptor<?, ?> method, final Metadata headers,
+ final CallOptions callOptions) {
+ Preconditions.checkNotNull(method, "method");
+ Preconditions.checkNotNull(headers, "headers");
+
+ final String defaultPath = "/" + method.getFullMethodName();
+ final String url = "https://" + authority + defaultPath;
+
+ final StatsTraceContext statsTraceCtx =
+ StatsTraceContext.newClientContext(callOptions, headers);
+ class StartCallback implements Runnable {
+ final CronetClientStream clientStream = new CronetClientStream(
+ url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize,
+ alwaysUsePut, method, statsTraceCtx, callOptions);
+
+ @Override
+ public void run() {
+ synchronized (lock) {
+ if (goAway) {
+ clientStream.transportState().transportReportStatus(goAwayStatus, true, new Metadata());
+ } else if (started) {
+ startStream(clientStream);
+ } else {
+ throw new AssertionError("Transport is not started");
+ }
+ }
+ }
+ }
+
+ return new StartCallback().clientStream;
+ }
+
+ @GuardedBy("lock")
+ private void startStream(CronetClientStream stream) {
+ streams.add(stream);
+ stream.transportState().start(streamFactory);
+ }
+
+ @Override
+ public Runnable start(Listener listener) {
+ this.listener = Preconditions.checkNotNull(listener, "listener");
+ synchronized (lock) {
+ started = true;
+ }
+ return new Runnable() {
+ @Override
+ public void run() {
+ // Listener callbacks should not be called simultaneously
+ CronetClientTransport.this.listener.transportReady();
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + "(" + address + ")";
+ }
+
+ public void shutdown() {
+ shutdown(Status.UNAVAILABLE.withDescription("Transport stopped"));
+ }
+
+ @Override
+ public void shutdown(Status status) {
+ synchronized (lock) {
+ if (goAway) {
+ return;
+ }
+ }
+
+ startGoAway(status);
+ }
+
+ @Override
+ public void shutdownNow(Status status) {
+ shutdown(status);
+ ArrayList<CronetClientStream> streamsCopy;
+ synchronized (lock) {
+ // A copy is always necessary since cancel() can call finishStream() which calls
+ // streams.remove()
+ streamsCopy = new ArrayList<CronetClientStream>(streams);
+ }
+ for (int i = 0; i < streamsCopy.size(); i++) {
+ // Avoid deadlock by calling into stream without lock held
+ streamsCopy.get(i).cancel(status);
+ }
+ stopIfNecessary();
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ // TODO(zhangkun83): fill channel security attributes
+ return Attributes.EMPTY;
+ }
+
+ private void startGoAway(Status status) {
+ synchronized (lock) {
+ if (startedGoAway) {
+ // Another go-away is in progress, ignore this one.
+ return;
+ }
+ // We use startedGoAway here instead of goAway, because once the goAway becomes true, other
+ // thread in stopIfNecessary() may stop the transport and cause the
+ // listener.transportTerminated() be called before listener.transportShutdown().
+ startedGoAway = true;
+ }
+
+ listener.transportShutdown(status);
+
+ synchronized (lock) {
+ goAway = true;
+ goAwayStatus = status;
+ }
+
+ stopIfNecessary();
+ }
+
+ @Override
+ public void ping(final PingCallback callback, Executor executor) {
+ // TODO(ericgribkoff): depend on cronet implemenetation
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LogId getLogId() {
+ return logId;
+ }
+
+ /**
+ * When the transport is in goAway state, we should stop it once all active streams finish.
+ */
+ void stopIfNecessary() {
+ synchronized (lock) {
+ if (goAway && !stopped && streams.size() == 0) {
+ stopped = true;
+ } else {
+ return;
+ }
+ }
+ listener.transportTerminated();
+ }
+
+ void finishStream(CronetClientStream stream, Status status) {
+ synchronized (lock) {
+ if (streams.remove(stream)) {
+ boolean isCancelled = (status.getCode() == Code.CANCELLED
+ || status.getCode() == Code.DEADLINE_EXCEEDED);
+ stream.transportState().transportReportStatus(status, isCancelled, new Metadata());
+ } else {
+ return;
+ }
+ }
+ stopIfNecessary();
+ }
+}
diff --git a/cronet/src/main/java/io/grpc/cronet/CronetWritableBuffer.java b/cronet/src/main/java/io/grpc/cronet/CronetWritableBuffer.java
new file mode 100644
index 000000000..f0e713a2b
--- /dev/null
+++ b/cronet/src/main/java/io/grpc/cronet/CronetWritableBuffer.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2016, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.cronet;
+
+import com.google.common.base.Preconditions;
+import io.grpc.internal.WritableBuffer;
+import java.nio.ByteBuffer;
+
+class CronetWritableBuffer implements WritableBuffer {
+ private final ByteBuffer buffer;
+
+ public CronetWritableBuffer(ByteBuffer buffer, int capacity) {
+ this.buffer = Preconditions.checkNotNull(buffer, "buffer");
+ }
+
+ @Override
+ public void write(byte[] src, int srcIndex, int length) {
+ buffer.put(src, srcIndex, length);
+ }
+
+ @Override
+ public void write(byte b) {
+ buffer.put(b);
+ }
+
+ @Override
+ public int writableBytes() {
+ return buffer.remaining();
+ }
+
+ @Override
+ public int readableBytes() {
+ return buffer.position();
+ }
+
+ @Override
+ public void release() {
+ }
+
+ ByteBuffer buffer() {
+ return buffer;
+ }
+}
diff --git a/cronet/src/main/java/io/grpc/cronet/CronetWritableBufferAllocator.java b/cronet/src/main/java/io/grpc/cronet/CronetWritableBufferAllocator.java
new file mode 100644
index 000000000..c373b03eb
--- /dev/null
+++ b/cronet/src/main/java/io/grpc/cronet/CronetWritableBufferAllocator.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2016, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.cronet;
+
+import io.grpc.internal.WritableBuffer;
+import io.grpc.internal.WritableBufferAllocator;
+import java.nio.ByteBuffer;
+
+/**
+ * The default allocator for {@link CronetWritableBuffer}s used by the Cronet transport.
+ */
+class CronetWritableBufferAllocator implements WritableBufferAllocator {
+ // Set the maximum buffer size to 1MB
+ private static final int MAX_BUFFER = 1024 * 1024;
+
+ /**
+ * Construct a new instance.
+ */
+ CronetWritableBufferAllocator() {
+ }
+
+ @Override
+ public WritableBuffer allocate(int capacityHint) {
+ capacityHint = Math.min(MAX_BUFFER, capacityHint);
+ return new CronetWritableBuffer(ByteBuffer.allocateDirect(capacityHint), capacityHint);
+ }
+}