aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Lawson <benlawson@google.com>2024-02-21 15:52:13 -0800
committerBen Lawson <benlawson@google.com>2024-03-07 16:18:49 -0800
commit256044a7893375748e0fd535f1acbd205c6dc97f (patch)
tree2036dceb24c15da57b6eed88ba58e06740420da1
parentde8f3d9c1e682fb8076f3785c8011bb1c0c2b1bd (diff)
downloadbumble-256044a7893375748e0fd535f1acbd205c6dc97f.tar.gz
Implement Pandora extended advertising
Support setting the PHY of Pandora scans.
-rw-r--r--.gitignore3
-rw-r--r--bumble/device.py20
-rw-r--r--bumble/pandora/host.py144
3 files changed, 156 insertions, 11 deletions
diff --git a/.gitignore b/.gitignore
index 830ec1a..1a5fb9d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,6 +9,9 @@ __pycache__
# generated by setuptools_scm
bumble/_version.py
.vscode/launch.json
+.vscode/settings.json
/.idea
venv/
.venv/
+# snoop logs
+out/
diff --git a/bumble/device.py b/bumble/device.py
index 01a5cbe..48f9d58 100644
--- a/bumble/device.py
+++ b/bumble/device.py
@@ -2112,6 +2112,20 @@ class Device(CompositeEventEmitter):
Returns:
An AdvertisingSet instance.
"""
+ # Instantiate default values
+ if advertising_parameters is None:
+ advertising_parameters = AdvertisingParameters()
+
+ if (
+ not advertising_parameters.advertising_event_properties.is_legacy
+ and advertising_data
+ and scan_response_data
+ ):
+ raise ValueError(
+ "Extended advertisements can't have both data and scan \
+ response data"
+ )
+
# Allocate a new handle
try:
advertising_handle = next(
@@ -2125,10 +2139,6 @@ class Device(CompositeEventEmitter):
except StopIteration as exc:
raise RuntimeError("all valid advertising handles already in use") from exc
- # Instantiate default values
- if advertising_parameters is None:
- advertising_parameters = AdvertisingParameters()
-
# Use the device's random address if a random address is needed but none was
# provided.
if (
@@ -2222,7 +2232,7 @@ class Device(CompositeEventEmitter):
scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type: int = OwnAddressType.RANDOM,
filter_duplicates: bool = False,
- scanning_phys: Tuple[int, int] = (HCI_LE_1M_PHY, HCI_LE_CODED_PHY),
+ scanning_phys: List[int] = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY],
) -> None:
# Check that the arguments are legal
if scan_interval < scan_window:
diff --git a/bumble/pandora/host.py b/bumble/pandora/host.py
index c270f67..3f48f46 100644
--- a/bumble/pandora/host.py
+++ b/bumble/pandora/host.py
@@ -34,8 +34,11 @@ from bumble.device import (
DEVICE_DEFAULT_SCAN_INTERVAL,
DEVICE_DEFAULT_SCAN_WINDOW,
Advertisement,
+ AdvertisingParameters,
+ AdvertisingEventProperties,
AdvertisingType,
Device,
+ Phy,
)
from bumble.gatt import Service
from bumble.hci import (
@@ -47,6 +50,7 @@ from bumble.hci import (
from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
from pandora.host_grpc_aio import HostServicer
+from pandora import host_pb2
from pandora.host_pb2 import (
NOT_CONNECTABLE,
NOT_DISCOVERABLE,
@@ -94,6 +98,25 @@ SECONDARY_PHY_MAP: Dict[int, SecondaryPhy] = {
3: SECONDARY_CODED,
}
+PRIMARY_PHY_TO_BUMBLE_PHY_MAP: Dict[PrimaryPhy, Phy] = {
+ PRIMARY_1M: Phy.LE_1M,
+ PRIMARY_CODED: Phy.LE_CODED,
+}
+
+SECONDARY_PHY_TO_BUMBLE_PHY_MAP: Dict[SecondaryPhy, Phy] = {
+ SECONDARY_NONE: Phy.LE_1M,
+ SECONDARY_1M: Phy.LE_1M,
+ SECONDARY_2M: Phy.LE_2M,
+ SECONDARY_CODED: Phy.LE_CODED,
+}
+
+OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, bumble.hci.OwnAddressType] = {
+ host_pb2.PUBLIC: bumble.hci.OwnAddressType.PUBLIC,
+ host_pb2.RANDOM: bumble.hci.OwnAddressType.RANDOM,
+ host_pb2.RESOLVABLE_OR_PUBLIC: bumble.hci.OwnAddressType.RESOLVABLE_OR_PUBLIC,
+ host_pb2.RESOLVABLE_OR_RANDOM: bumble.hci.OwnAddressType.RESOLVABLE_OR_RANDOM,
+}
+
class HostService(HostServicer):
waited_connections: Set[int]
@@ -281,10 +304,113 @@ class HostService(HostServicer):
async def Advertise(
self, request: AdvertiseRequest, context: grpc.ServicerContext
) -> AsyncGenerator[AdvertiseResponse, None]:
- if not request.legacy:
- raise NotImplementedError(
- "TODO: add support for extended advertising in Bumble"
+ try:
+ if request.legacy:
+ async for rsp in self.legacy_advertise(request, context):
+ yield rsp
+ else:
+ async for rsp in self.extended_advertise(request, context):
+ yield rsp
+ finally:
+ pass
+
+ async def extended_advertise(
+ self, request: AdvertiseRequest, context: grpc.ServicerContext
+ ) -> AsyncGenerator[AdvertiseResponse, None]:
+ advertising_data = bytes(self.unpack_data_types(request.data))
+ scan_response_data = bytes(self.unpack_data_types(request.scan_response_data))
+ scannable = len(scan_response_data) != 0
+
+ advertising_event_properties = AdvertisingEventProperties(
+ is_connectable=request.connectable,
+ is_scannable=scannable,
+ is_directed=request.target is not None,
+ is_high_duty_cycle_directed_connectable=False,
+ is_legacy=False,
+ is_anonymous=False,
+ include_tx_power=False,
+ )
+
+ peer_address = Address.ANY
+ if request.target:
+ # Need to reverse bytes order since Bumble Address is using MSB.
+ target_bytes = bytes(reversed(request.target))
+ if request.target_variant() == "public":
+ peer_address = Address(target_bytes, Address.PUBLIC_DEVICE_ADDRESS)
+ else:
+ peer_address = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS)
+
+ advertising_parameters = AdvertisingParameters(
+ advertising_event_properties=advertising_event_properties,
+ own_address_type=OWN_ADDRESS_MAP[request.own_address_type],
+ peer_address=peer_address,
+ primary_advertising_phy=PRIMARY_PHY_TO_BUMBLE_PHY_MAP[request.primary_phy],
+ secondary_advertising_phy=SECONDARY_PHY_TO_BUMBLE_PHY_MAP[
+ request.secondary_phy
+ ],
+ )
+ if advertising_interval := request.interval:
+ advertising_parameters.primary_advertising_interval_min = int(
+ advertising_interval
)
+ advertising_parameters.primary_advertising_interval_max = int(
+ advertising_interval
+ )
+ if interval_range := request.interval_range:
+ advertising_parameters.primary_advertising_interval_max += int(
+ interval_range
+ )
+
+ advertising_set = await self.device.create_advertising_set(
+ advertising_parameters=advertising_parameters,
+ advertising_data=advertising_data,
+ scan_response_data=scan_response_data,
+ )
+
+ pending_connection: asyncio.Future[
+ bumble.device.Connection
+ ] = asyncio.get_running_loop().create_future()
+
+ if request.connectable:
+
+ def on_connection(connection: bumble.device.Connection) -> None:
+ if (
+ connection.transport == BT_LE_TRANSPORT
+ and connection.role == BT_PERIPHERAL_ROLE
+ ):
+ pending_connection.set_result(connection)
+
+ self.device.on('connection', on_connection)
+
+ try:
+ # Advertise until RPC is canceled
+ while True:
+ if not advertising_set.enabled:
+ self.log.debug('Advertise (extended)')
+ await advertising_set.start()
+
+ if not request.connectable:
+ await asyncio.sleep(1)
+ continue
+
+ connection = await pending_connection
+ pending_connection = asyncio.get_running_loop().create_future()
+
+ cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
+ yield AdvertiseResponse(connection=Connection(cookie=cookie))
+
+ await asyncio.sleep(1)
+ finally:
+ try:
+ self.log.debug('Stop Advertise (extended)')
+ await advertising_set.stop()
+ await advertising_set.remove()
+ except Exception:
+ pass
+
+ async def legacy_advertise(
+ self, request: AdvertiseRequest, context: grpc.ServicerContext
+ ) -> AsyncGenerator[AdvertiseResponse, None]:
if advertising_interval := request.interval:
self.device.config.advertising_interval_min = int(advertising_interval)
self.device.config.advertising_interval_max = int(advertising_interval)
@@ -422,11 +548,16 @@ class HostService(HostServicer):
self, request: ScanRequest, context: grpc.ServicerContext
) -> AsyncGenerator[ScanningResponse, None]:
# TODO: modify `start_scanning` to accept floats instead of int for ms values
- if request.phys:
- raise NotImplementedError("TODO: add support for `request.phys`")
-
self.log.debug('Scan')
+ scanning_phys = []
+ if PRIMARY_1M in request.phys:
+ scanning_phys.append(int(Phy.LE_1M))
+ if PRIMARY_CODED in request.phys:
+ scanning_phys.append(int(Phy.LE_CODED))
+ if not scanning_phys:
+ scanning_phys = [int(Phy.LE_1M), int(Phy.LE_CODED)]
+
scan_queue: asyncio.Queue[Advertisement] = asyncio.Queue()
handler = self.device.on('advertisement', scan_queue.put_nowait)
await self.device.start_scanning(
@@ -439,6 +570,7 @@ class HostService(HostServicer):
scan_window=int(request.window)
if request.window
else DEVICE_DEFAULT_SCAN_WINDOW,
+ scanning_phys=scanning_phys,
)
try: