diff options
Diffstat (limited to 'pw_rpc/public/pw_rpc/internal/client_call.h')
-rw-r--r-- | pw_rpc/public/pw_rpc/internal/client_call.h | 99 |
1 files changed, 50 insertions, 49 deletions
diff --git a/pw_rpc/public/pw_rpc/internal/client_call.h b/pw_rpc/public/pw_rpc/internal/client_call.h index c4d9c1073..793d581e4 100644 --- a/pw_rpc/public/pw_rpc/internal/client_call.h +++ b/pw_rpc/public/pw_rpc/internal/client_call.h @@ -18,6 +18,7 @@ #include "pw_bytes/span.h" #include "pw_function/function.h" #include "pw_rpc/internal/call.h" +#include "pw_rpc/internal/endpoint.h" #include "pw_rpc/internal/lock.h" namespace pw::rpc::internal { @@ -25,37 +26,52 @@ namespace pw::rpc::internal { // A Call object, as used by an RPC client. class ClientCall : public Call { public: - ~ClientCall() PW_LOCKS_EXCLUDED(rpc_lock()) { - rpc_lock().lock(); - CloseClientCall(); - rpc_lock().unlock(); + ~ClientCall() PW_LOCKS_EXCLUDED(rpc_lock()) { Abandon(); } + + uint32_t id() const PW_LOCKS_EXCLUDED(rpc_lock()) { + RpcLockGuard lock; + return Call::id(); } protected: + // Initializes CallProperties for a struct-based client call impl. + static constexpr CallProperties StructCallProps(MethodType type) { + return CallProperties(type, kClientCall, kProtoStruct); + } + + // Initializes CallProperties for a raw client call. + static constexpr CallProperties RawCallProps(MethodType type) { + return CallProperties(type, kClientCall, kRawProto); + } + constexpr ClientCall() = default; - ClientCall(Endpoint& client, + ClientCall(LockedEndpoint& client, uint32_t channel_id, uint32_t service_id, uint32_t method_id, - MethodType type) - : Call(client, channel_id, service_id, method_id, type) {} + CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) + : Call(client, channel_id, service_id, method_id, properties) {} + + // Public function that closes a call client-side without cancelling it on the + // server. + void Abandon() PW_LOCKS_EXCLUDED(rpc_lock()) { + RpcLockGuard lock; + CloseClientCall(); + } - // Sends CLIENT_STREAM_END if applicable, releases any held payload buffer, - // and marks the call as closed. + // Sends CLIENT_STREAM_END if applicable and marks the call as closed. void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); void MoveClientCallFrom(ClientCall& other) - PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { - CloseClientCall(); - MoveFrom(other); - } + PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()); }; // Unary response client calls receive both a payload and the status in their // on_completed callback. The on_next callback is not used. class UnaryResponseClientCall : public ClientCall { public: + // Start call for raw unary response RPCs. template <typename CallType> static CallType Start(Endpoint& client, uint32_t channel_id, @@ -65,34 +81,28 @@ class UnaryResponseClientCall : public ClientCall { Function<void(Status)>&& on_error, ConstByteSpan request) PW_LOCKS_EXCLUDED(rpc_lock()) { rpc_lock().lock(); - CallType call(client, channel_id, service_id, method_id); + CallType call(client.ClaimLocked(), channel_id, service_id, method_id); call.set_on_completed_locked(std::move(on_completed)); call.set_on_error_locked(std::move(on_error)); call.SendInitialClientRequest(request); + client.CleanUpCalls(); return call; } void HandleCompleted(ConstByteSpan response, Status status) - PW_UNLOCK_FUNCTION(rpc_lock()) { - const bool invoke_callback = on_completed_ != nullptr; - UnregisterAndMarkClosed(); - - rpc_lock().unlock(); - if (invoke_callback) { - on_completed_(response, status); - } - } + PW_UNLOCK_FUNCTION(rpc_lock()); protected: constexpr UnaryResponseClientCall() = default; - UnaryResponseClientCall(Endpoint& client, + UnaryResponseClientCall(LockedEndpoint& client, uint32_t channel_id, uint32_t service_id, uint32_t method_id, - MethodType type) - : ClientCall(client, channel_id, service_id, method_id, type) {} + CallProperties properties) + PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) + : ClientCall(client, channel_id, service_id, method_id, properties) {} UnaryResponseClientCall(UnaryResponseClientCall&& other) { *this = std::move(other); @@ -100,7 +110,7 @@ class UnaryResponseClientCall : public ClientCall { UnaryResponseClientCall& operator=(UnaryResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) { - LockGuard lock(rpc_lock()); + RpcLockGuard lock; MoveUnaryResponseClientCallFrom(other); return *this; } @@ -113,8 +123,7 @@ class UnaryResponseClientCall : public ClientCall { void set_on_completed(Function<void(ConstByteSpan, Status)>&& on_completed) PW_LOCKS_EXCLUDED(rpc_lock()) { - // TODO(pwbug/597): Ensure on_completed_ is properly guarded. - LockGuard lock(rpc_lock()); + RpcLockGuard lock; set_on_completed_locked(std::move(on_completed)); } @@ -127,13 +136,14 @@ class UnaryResponseClientCall : public ClientCall { private: using internal::ClientCall::set_on_next; // Not used in unary response calls. - Function<void(ConstByteSpan, Status)> on_completed_; + Function<void(ConstByteSpan, Status)> on_completed_ PW_GUARDED_BY(rpc_lock()); }; // Stream response client calls only receive the status in their on_completed // callback. Payloads are sent through the on_next callback. class StreamResponseClientCall : public ClientCall { public: + // Start call for raw stream response RPCs. template <typename CallType> static CallType Start(Endpoint& client, uint32_t channel_id, @@ -144,37 +154,29 @@ class StreamResponseClientCall : public ClientCall { Function<void(Status)>&& on_error, ConstByteSpan request) { rpc_lock().lock(); - CallType call(client, channel_id, service_id, method_id); + CallType call(client.ClaimLocked(), channel_id, service_id, method_id); call.set_on_next_locked(std::move(on_next)); call.set_on_completed_locked(std::move(on_completed)); call.set_on_error_locked(std::move(on_error)); call.SendInitialClientRequest(request); + client.CleanUpCalls(); return call; } - void HandleCompleted(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) { - const bool invoke_callback = on_completed_ != nullptr; - - UnregisterAndMarkClosed(); - rpc_lock().unlock(); - - // TODO(pwbug/597): Ensure on_completed_ is properly guarded. - if (invoke_callback) { - on_completed_(status); - } - } + void HandleCompleted(Status status) PW_UNLOCK_FUNCTION(rpc_lock()); protected: constexpr StreamResponseClientCall() = default; - StreamResponseClientCall(Endpoint& client, + StreamResponseClientCall(LockedEndpoint& client, uint32_t channel_id, uint32_t service_id, uint32_t method_id, - MethodType type) - : ClientCall(client, channel_id, service_id, method_id, type) {} + CallProperties properties) + PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) + : ClientCall(client, channel_id, service_id, method_id, properties) {} StreamResponseClientCall(StreamResponseClientCall&& other) { *this = std::move(other); @@ -182,7 +184,7 @@ class StreamResponseClientCall : public ClientCall { StreamResponseClientCall& operator=(StreamResponseClientCall&& other) PW_LOCKS_EXCLUDED(rpc_lock()) { - LockGuard lock(rpc_lock()); + RpcLockGuard lock; MoveStreamResponseClientCallFrom(other); return *this; } @@ -195,8 +197,7 @@ class StreamResponseClientCall : public ClientCall { void set_on_completed(Function<void(Status)>&& on_completed) PW_LOCKS_EXCLUDED(rpc_lock()) { - // TODO(pwbug/597): Ensure on_completed_ is properly guarded. - LockGuard lock(rpc_lock()); + RpcLockGuard lock; set_on_completed_locked(std::move(on_completed)); } @@ -206,7 +207,7 @@ class StreamResponseClientCall : public ClientCall { } private: - Function<void(Status)> on_completed_; + Function<void(Status)> on_completed_ PW_GUARDED_BY(rpc_lock()); }; } // namespace pw::rpc::internal |