aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRan <ran-su@users.noreply.github.com>2020-04-06 10:55:24 -0700
committerGitHub <noreply@github.com>2020-04-06 10:55:24 -0700
commit37913fd3b1dd0b20c2bba24cf68eafae83d1acbe (patch)
treee438931698d6ebd87267456211e5aba10dfbf30b
parent24e3d9587eed013636d4419dc5d3dd926cbc48f0 (diff)
downloadgrpc-grpc-java-37913fd3b1dd0b20c2bba24cf68eafae83d1acbe.tar.gz
stub: add Blocking StubType to blocking ClientCalls methods. (#6900)
-rw-r--r--stub/src/main/java/io/grpc/stub/ClientCalls.java8
-rw-r--r--stub/src/test/java/io/grpc/stub/ClientCallsTest.java58
2 files changed, 64 insertions, 2 deletions
diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java
index 82c370834..dcfc29e2d 100644
--- a/stub/src/main/java/io/grpc/stub/ClientCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java
@@ -125,7 +125,9 @@ public final class ClientCalls {
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
boolean interrupt = false;
- ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
+ ClientCall<ReqT, RespT> call = channel.newCall(method,
+ callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
+ .withExecutor(executor));
try {
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
while (!responseFuture.isDone()) {
@@ -177,7 +179,9 @@ public final class ClientCalls {
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
- ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
+ ClientCall<ReqT, RespT> call = channel.newCall(method,
+ callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
+ .withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor);
asyncUnaryRequestCall(call, req, result.listener(), true);
return result;
diff --git a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
index a63648770..b33b94c26 100644
--- a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java
@@ -23,6 +23,9 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -43,6 +46,7 @@ import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.NoopClientCall;
+import io.grpc.stub.ClientCalls.StubType;
import io.grpc.stub.ServerCalls.NoopStreamObserver;
import io.grpc.stub.ServerCalls.ServerStreamingMethod;
import io.grpc.stub.ServerCalls.UnaryMethod;
@@ -62,6 +66,10 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Captor;
+import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
@@ -83,6 +91,12 @@ public class ClientCallsTest {
private Server server;
private ManagedChannel channel;
+ @Mock
+ private ManagedChannel mockChannel;
+ @Captor
+ private ArgumentCaptor<MethodDescriptor<?, ?>> methodDescriptorCaptor;
+ @Captor
+ private ArgumentCaptor<CallOptions> callOptionsCaptor;
@Before
public void setUp() {
@@ -204,6 +218,50 @@ public class ClientCallsTest {
}
@Test
+ public void blockingUnaryCall_HasBlockingStubType() {
+ NoopClientCall<Integer, Integer> call = new NoopClientCall<Integer, Integer>() {
+ @Override
+ public void start(io.grpc.ClientCall.Listener<Integer> listener, Metadata headers) {
+ listener.onMessage(1);
+ listener.onClose(Status.OK, new Metadata());
+ }
+ };
+ when(mockChannel.newCall(
+ ArgumentMatchers.<MethodDescriptor<Integer, Integer>>any(), any(CallOptions.class)))
+ .thenReturn(call);
+
+ Integer unused =
+ ClientCalls.blockingUnaryCall(mockChannel, UNARY_METHOD, CallOptions.DEFAULT, 1);
+
+ verify(mockChannel).newCall(methodDescriptorCaptor.capture(), callOptionsCaptor.capture());
+ CallOptions capturedCallOption = callOptionsCaptor.getValue();
+ assertThat(capturedCallOption.getOption(ClientCalls.STUB_TYPE_OPTION))
+ .isEquivalentAccordingToCompareTo(StubType.BLOCKING);
+ }
+
+ @Test
+ public void blockingServerStreamingCall_HasBlockingStubType() {
+ NoopClientCall<Integer, Integer> call = new NoopClientCall<Integer, Integer>() {
+ @Override
+ public void start(io.grpc.ClientCall.Listener<Integer> listener, Metadata headers) {
+ listener.onMessage(1);
+ listener.onClose(Status.OK, new Metadata());
+ }
+ };
+ when(mockChannel.newCall(
+ ArgumentMatchers.<MethodDescriptor<Integer, Integer>>any(), any(CallOptions.class)))
+ .thenReturn(call);
+
+ Iterator<Integer> unused =
+ ClientCalls.blockingServerStreamingCall(mockChannel, UNARY_METHOD, CallOptions.DEFAULT, 1);
+
+ verify(mockChannel).newCall(methodDescriptorCaptor.capture(), callOptionsCaptor.capture());
+ CallOptions capturedCallOption = callOptionsCaptor.getValue();
+ assertThat(capturedCallOption.getOption(ClientCalls.STUB_TYPE_OPTION))
+ .isEquivalentAccordingToCompareTo(StubType.BLOCKING);
+ }
+
+ @Test
public void unaryFutureCallSuccess() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<>();