aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc/public/pw_rpc/internal/client_call.h
diff options
context:
space:
mode:
Diffstat (limited to 'pw_rpc/public/pw_rpc/internal/client_call.h')
-rw-r--r--pw_rpc/public/pw_rpc/internal/client_call.h99
1 files changed, 50 insertions, 49 deletions
diff --git a/pw_rpc/public/pw_rpc/internal/client_call.h b/pw_rpc/public/pw_rpc/internal/client_call.h
index c4d9c1073..793d581e4 100644
--- a/pw_rpc/public/pw_rpc/internal/client_call.h
+++ b/pw_rpc/public/pw_rpc/internal/client_call.h
@@ -18,6 +18,7 @@
#include "pw_bytes/span.h"
#include "pw_function/function.h"
#include "pw_rpc/internal/call.h"
+#include "pw_rpc/internal/endpoint.h"
#include "pw_rpc/internal/lock.h"
namespace pw::rpc::internal {
@@ -25,37 +26,52 @@ namespace pw::rpc::internal {
// A Call object, as used by an RPC client.
class ClientCall : public Call {
public:
- ~ClientCall() PW_LOCKS_EXCLUDED(rpc_lock()) {
- rpc_lock().lock();
- CloseClientCall();
- rpc_lock().unlock();
+ ~ClientCall() PW_LOCKS_EXCLUDED(rpc_lock()) { Abandon(); }
+
+ uint32_t id() const PW_LOCKS_EXCLUDED(rpc_lock()) {
+ RpcLockGuard lock;
+ return Call::id();
}
protected:
+ // Initializes CallProperties for a struct-based client call impl.
+ static constexpr CallProperties StructCallProps(MethodType type) {
+ return CallProperties(type, kClientCall, kProtoStruct);
+ }
+
+ // Initializes CallProperties for a raw client call.
+ static constexpr CallProperties RawCallProps(MethodType type) {
+ return CallProperties(type, kClientCall, kRawProto);
+ }
+
constexpr ClientCall() = default;
- ClientCall(Endpoint& client,
+ ClientCall(LockedEndpoint& client,
uint32_t channel_id,
uint32_t service_id,
uint32_t method_id,
- MethodType type)
- : Call(client, channel_id, service_id, method_id, type) {}
+ CallProperties properties) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
+ : Call(client, channel_id, service_id, method_id, properties) {}
+
+ // Public function that closes a call client-side without cancelling it on the
+ // server.
+ void Abandon() PW_LOCKS_EXCLUDED(rpc_lock()) {
+ RpcLockGuard lock;
+ CloseClientCall();
+ }
- // Sends CLIENT_STREAM_END if applicable, releases any held payload buffer,
- // and marks the call as closed.
+ // Sends CLIENT_STREAM_END if applicable and marks the call as closed.
void CloseClientCall() PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
void MoveClientCallFrom(ClientCall& other)
- PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
- CloseClientCall();
- MoveFrom(other);
- }
+ PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock());
};
// Unary response client calls receive both a payload and the status in their
// on_completed callback. The on_next callback is not used.
class UnaryResponseClientCall : public ClientCall {
public:
+ // Start call for raw unary response RPCs.
template <typename CallType>
static CallType Start(Endpoint& client,
uint32_t channel_id,
@@ -65,34 +81,28 @@ class UnaryResponseClientCall : public ClientCall {
Function<void(Status)>&& on_error,
ConstByteSpan request) PW_LOCKS_EXCLUDED(rpc_lock()) {
rpc_lock().lock();
- CallType call(client, channel_id, service_id, method_id);
+ CallType call(client.ClaimLocked(), channel_id, service_id, method_id);
call.set_on_completed_locked(std::move(on_completed));
call.set_on_error_locked(std::move(on_error));
call.SendInitialClientRequest(request);
+ client.CleanUpCalls();
return call;
}
void HandleCompleted(ConstByteSpan response, Status status)
- PW_UNLOCK_FUNCTION(rpc_lock()) {
- const bool invoke_callback = on_completed_ != nullptr;
- UnregisterAndMarkClosed();
-
- rpc_lock().unlock();
- if (invoke_callback) {
- on_completed_(response, status);
- }
- }
+ PW_UNLOCK_FUNCTION(rpc_lock());
protected:
constexpr UnaryResponseClientCall() = default;
- UnaryResponseClientCall(Endpoint& client,
+ UnaryResponseClientCall(LockedEndpoint& client,
uint32_t channel_id,
uint32_t service_id,
uint32_t method_id,
- MethodType type)
- : ClientCall(client, channel_id, service_id, method_id, type) {}
+ CallProperties properties)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
+ : ClientCall(client, channel_id, service_id, method_id, properties) {}
UnaryResponseClientCall(UnaryResponseClientCall&& other) {
*this = std::move(other);
@@ -100,7 +110,7 @@ class UnaryResponseClientCall : public ClientCall {
UnaryResponseClientCall& operator=(UnaryResponseClientCall&& other)
PW_LOCKS_EXCLUDED(rpc_lock()) {
- LockGuard lock(rpc_lock());
+ RpcLockGuard lock;
MoveUnaryResponseClientCallFrom(other);
return *this;
}
@@ -113,8 +123,7 @@ class UnaryResponseClientCall : public ClientCall {
void set_on_completed(Function<void(ConstByteSpan, Status)>&& on_completed)
PW_LOCKS_EXCLUDED(rpc_lock()) {
- // TODO(pwbug/597): Ensure on_completed_ is properly guarded.
- LockGuard lock(rpc_lock());
+ RpcLockGuard lock;
set_on_completed_locked(std::move(on_completed));
}
@@ -127,13 +136,14 @@ class UnaryResponseClientCall : public ClientCall {
private:
using internal::ClientCall::set_on_next; // Not used in unary response calls.
- Function<void(ConstByteSpan, Status)> on_completed_;
+ Function<void(ConstByteSpan, Status)> on_completed_ PW_GUARDED_BY(rpc_lock());
};
// Stream response client calls only receive the status in their on_completed
// callback. Payloads are sent through the on_next callback.
class StreamResponseClientCall : public ClientCall {
public:
+ // Start call for raw stream response RPCs.
template <typename CallType>
static CallType Start(Endpoint& client,
uint32_t channel_id,
@@ -144,37 +154,29 @@ class StreamResponseClientCall : public ClientCall {
Function<void(Status)>&& on_error,
ConstByteSpan request) {
rpc_lock().lock();
- CallType call(client, channel_id, service_id, method_id);
+ CallType call(client.ClaimLocked(), channel_id, service_id, method_id);
call.set_on_next_locked(std::move(on_next));
call.set_on_completed_locked(std::move(on_completed));
call.set_on_error_locked(std::move(on_error));
call.SendInitialClientRequest(request);
+ client.CleanUpCalls();
return call;
}
- void HandleCompleted(Status status) PW_UNLOCK_FUNCTION(rpc_lock()) {
- const bool invoke_callback = on_completed_ != nullptr;
-
- UnregisterAndMarkClosed();
- rpc_lock().unlock();
-
- // TODO(pwbug/597): Ensure on_completed_ is properly guarded.
- if (invoke_callback) {
- on_completed_(status);
- }
- }
+ void HandleCompleted(Status status) PW_UNLOCK_FUNCTION(rpc_lock());
protected:
constexpr StreamResponseClientCall() = default;
- StreamResponseClientCall(Endpoint& client,
+ StreamResponseClientCall(LockedEndpoint& client,
uint32_t channel_id,
uint32_t service_id,
uint32_t method_id,
- MethodType type)
- : ClientCall(client, channel_id, service_id, method_id, type) {}
+ CallProperties properties)
+ PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock())
+ : ClientCall(client, channel_id, service_id, method_id, properties) {}
StreamResponseClientCall(StreamResponseClientCall&& other) {
*this = std::move(other);
@@ -182,7 +184,7 @@ class StreamResponseClientCall : public ClientCall {
StreamResponseClientCall& operator=(StreamResponseClientCall&& other)
PW_LOCKS_EXCLUDED(rpc_lock()) {
- LockGuard lock(rpc_lock());
+ RpcLockGuard lock;
MoveStreamResponseClientCallFrom(other);
return *this;
}
@@ -195,8 +197,7 @@ class StreamResponseClientCall : public ClientCall {
void set_on_completed(Function<void(Status)>&& on_completed)
PW_LOCKS_EXCLUDED(rpc_lock()) {
- // TODO(pwbug/597): Ensure on_completed_ is properly guarded.
- LockGuard lock(rpc_lock());
+ RpcLockGuard lock;
set_on_completed_locked(std::move(on_completed));
}
@@ -206,7 +207,7 @@ class StreamResponseClientCall : public ClientCall {
}
private:
- Function<void(Status)> on_completed_;
+ Function<void(Status)> on_completed_ PW_GUARDED_BY(rpc_lock());
};
} // namespace pw::rpc::internal