diff options
author | Ran <ran-su@users.noreply.github.com> | 2020-04-06 10:55:24 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-06 10:55:24 -0700 |
commit | 37913fd3b1dd0b20c2bba24cf68eafae83d1acbe (patch) | |
tree | e438931698d6ebd87267456211e5aba10dfbf30b | |
parent | 24e3d9587eed013636d4419dc5d3dd926cbc48f0 (diff) | |
download | grpc-grpc-java-37913fd3b1dd0b20c2bba24cf68eafae83d1acbe.tar.gz |
stub: add Blocking StubType to blocking ClientCalls methods. (#6900)
-rw-r--r-- | stub/src/main/java/io/grpc/stub/ClientCalls.java | 8 | ||||
-rw-r--r-- | stub/src/test/java/io/grpc/stub/ClientCallsTest.java | 58 |
2 files changed, 64 insertions, 2 deletions
diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 82c370834..dcfc29e2d 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -125,7 +125,9 @@ public final class ClientCalls { Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) { ThreadlessExecutor executor = new ThreadlessExecutor(); boolean interrupt = false; - ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor)); + ClientCall<ReqT, RespT> call = channel.newCall(method, + callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING) + .withExecutor(executor)); try { ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req); while (!responseFuture.isDone()) { @@ -177,7 +179,9 @@ public final class ClientCalls { public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall( Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) { ThreadlessExecutor executor = new ThreadlessExecutor(); - ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor)); + ClientCall<ReqT, RespT> call = channel.newCall(method, + callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING) + .withExecutor(executor)); BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor); asyncUnaryRequestCall(call, req, result.listener(), true); return result; diff --git a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java index a63648770..b33b94c26 100644 --- a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java @@ -23,6 +23,9 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -43,6 +46,7 @@ import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.NoopClientCall; +import io.grpc.stub.ClientCalls.StubType; import io.grpc.stub.ServerCalls.NoopStreamObserver; import io.grpc.stub.ServerCalls.ServerStreamingMethod; import io.grpc.stub.ServerCalls.UnaryMethod; @@ -62,6 +66,10 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Captor; +import org.mockito.Mock; import org.mockito.MockitoAnnotations; /** @@ -83,6 +91,12 @@ public class ClientCallsTest { private Server server; private ManagedChannel channel; + @Mock + private ManagedChannel mockChannel; + @Captor + private ArgumentCaptor<MethodDescriptor<?, ?>> methodDescriptorCaptor; + @Captor + private ArgumentCaptor<CallOptions> callOptionsCaptor; @Before public void setUp() { @@ -204,6 +218,50 @@ public class ClientCallsTest { } @Test + public void blockingUnaryCall_HasBlockingStubType() { + NoopClientCall<Integer, Integer> call = new NoopClientCall<Integer, Integer>() { + @Override + public void start(io.grpc.ClientCall.Listener<Integer> listener, Metadata headers) { + listener.onMessage(1); + listener.onClose(Status.OK, new Metadata()); + } + }; + when(mockChannel.newCall( + ArgumentMatchers.<MethodDescriptor<Integer, Integer>>any(), any(CallOptions.class))) + .thenReturn(call); + + Integer unused = + ClientCalls.blockingUnaryCall(mockChannel, UNARY_METHOD, CallOptions.DEFAULT, 1); + + verify(mockChannel).newCall(methodDescriptorCaptor.capture(), callOptionsCaptor.capture()); + CallOptions capturedCallOption = callOptionsCaptor.getValue(); + assertThat(capturedCallOption.getOption(ClientCalls.STUB_TYPE_OPTION)) + .isEquivalentAccordingToCompareTo(StubType.BLOCKING); + } + + @Test + public void blockingServerStreamingCall_HasBlockingStubType() { + NoopClientCall<Integer, Integer> call = new NoopClientCall<Integer, Integer>() { + @Override + public void start(io.grpc.ClientCall.Listener<Integer> listener, Metadata headers) { + listener.onMessage(1); + listener.onClose(Status.OK, new Metadata()); + } + }; + when(mockChannel.newCall( + ArgumentMatchers.<MethodDescriptor<Integer, Integer>>any(), any(CallOptions.class))) + .thenReturn(call); + + Iterator<Integer> unused = + ClientCalls.blockingServerStreamingCall(mockChannel, UNARY_METHOD, CallOptions.DEFAULT, 1); + + verify(mockChannel).newCall(methodDescriptorCaptor.capture(), callOptionsCaptor.capture()); + CallOptions capturedCallOption = callOptionsCaptor.getValue(); + assertThat(capturedCallOption.getOption(ClientCalls.STUB_TYPE_OPTION)) + .isEquivalentAccordingToCompareTo(StubType.BLOCKING); + } + + @Test public void unaryFutureCallSuccess() throws Exception { final AtomicReference<ClientCall.Listener<String>> listener = new AtomicReference<>(); |