aboutsummaryrefslogtreecommitdiff
path: root/rust/src/wrapper/drivers/rtk.rs
diff options
context:
space:
mode:
Diffstat (limited to 'rust/src/wrapper/drivers/rtk.rs')
-rw-r--r--rust/src/wrapper/drivers/rtk.rs141
1 files changed, 141 insertions, 0 deletions
diff --git a/rust/src/wrapper/drivers/rtk.rs b/rust/src/wrapper/drivers/rtk.rs
new file mode 100644
index 0000000..1f629d1
--- /dev/null
+++ b/rust/src/wrapper/drivers/rtk.rs
@@ -0,0 +1,141 @@
+// Copyright 2023 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Drivers for Realtek controllers
+
+use crate::wrapper::{host::Host, PyObjectExt};
+use pyo3::{intern, types::PyModule, PyObject, PyResult, Python, ToPyObject};
+use pyo3_asyncio::tokio::into_future;
+
+pub use crate::internal::drivers::rtk::{Firmware, Patch};
+
+/// Driver for a Realtek controller
+pub struct Driver(PyObject);
+
+impl Driver {
+ /// Locate the driver for the provided host.
+ pub async fn for_host(host: &Host, force: bool) -> PyResult<Option<Self>> {
+ Python::with_gil(|py| {
+ PyModule::import(py, intern!(py, "bumble.drivers.rtk"))?
+ .getattr(intern!(py, "Driver"))?
+ .call_method1(intern!(py, "for_host"), (&host.obj, force))
+ .and_then(into_future)
+ })?
+ .await
+ .map(|obj| obj.into_option(Self))
+ }
+
+ /// Check if the host has a known driver.
+ pub async fn check(host: &Host) -> PyResult<bool> {
+ Python::with_gil(|py| {
+ PyModule::import(py, intern!(py, "bumble.drivers.rtk"))?
+ .getattr(intern!(py, "Driver"))?
+ .call_method1(intern!(py, "check"), (&host.obj,))
+ .and_then(|obj| obj.extract::<bool>())
+ })
+ }
+
+ /// Find the [DriverInfo] for the host, if one matches
+ pub async fn driver_info_for_host(host: &Host) -> PyResult<Option<DriverInfo>> {
+ Python::with_gil(|py| {
+ PyModule::import(py, intern!(py, "bumble.drivers.rtk"))?
+ .getattr(intern!(py, "Driver"))?
+ .call_method1(intern!(py, "driver_info_for_host"), (&host.obj,))
+ .and_then(into_future)
+ })?
+ .await
+ .map(|obj| obj.into_option(DriverInfo))
+ }
+
+ /// Send a command to the device to drop firmware
+ pub async fn drop_firmware(host: &mut Host) -> PyResult<()> {
+ Python::with_gil(|py| {
+ PyModule::import(py, intern!(py, "bumble.drivers.rtk"))?
+ .getattr(intern!(py, "Driver"))?
+ .call_method1(intern!(py, "drop_firmware"), (&host.obj,))
+ .and_then(into_future)
+ })?
+ .await
+ .map(|_| ())
+ }
+
+ /// Load firmware onto the device.
+ pub async fn download_firmware(&mut self) -> PyResult<()> {
+ Python::with_gil(|py| {
+ self.0
+ .call_method0(py, intern!(py, "download_firmware"))
+ .and_then(|coroutine| into_future(coroutine.as_ref(py)))
+ })?
+ .await
+ .map(|_| ())
+ }
+}
+
+/// Metadata about a known driver & applicable device
+pub struct DriverInfo(PyObject);
+
+impl DriverInfo {
+ /// Returns a list of all drivers that Bumble knows how to handle.
+ pub fn all_drivers() -> PyResult<Vec<DriverInfo>> {
+ Python::with_gil(|py| {
+ PyModule::import(py, intern!(py, "bumble.drivers.rtk"))?
+ .getattr(intern!(py, "Driver"))?
+ .getattr(intern!(py, "DRIVER_INFOS"))?
+ .iter()?
+ .map(|r| r.map(|h| DriverInfo(h.to_object(py))))
+ .collect::<PyResult<Vec<_>>>()
+ })
+ }
+
+ /// The firmware file name to load from the filesystem, e.g. `foo_fw.bin`.
+ pub fn firmware_name(&self) -> PyResult<String> {
+ Python::with_gil(|py| {
+ self.0
+ .getattr(py, intern!(py, "fw_name"))?
+ .as_ref(py)
+ .extract::<String>()
+ })
+ }
+
+ /// The config file name, if any, to load from the filesystem, e.g. `foo_config.bin`.
+ pub fn config_name(&self) -> PyResult<Option<String>> {
+ Python::with_gil(|py| {
+ let obj = self.0.getattr(py, intern!(py, "config_name"))?;
+ let handle = obj.as_ref(py);
+
+ if handle.is_none() {
+ Ok(None)
+ } else {
+ handle
+ .extract::<String>()
+ .map(|s| if s.is_empty() { None } else { Some(s) })
+ }
+ })
+ }
+
+ /// Whether or not config is required.
+ pub fn config_needed(&self) -> PyResult<bool> {
+ Python::with_gil(|py| {
+ self.0
+ .getattr(py, intern!(py, "config_needed"))?
+ .as_ref(py)
+ .extract::<bool>()
+ })
+ }
+
+ /// ROM id
+ pub fn rom(&self) -> PyResult<u32> {
+ Python::with_gil(|py| self.0.getattr(py, intern!(py, "rom"))?.as_ref(py).extract())
+ }
+}