diff options
author | Ben Lawson <benlawson@google.com> | 2024-02-21 15:52:13 -0800 |
---|---|---|
committer | Ben Lawson <benlawson@google.com> | 2024-03-07 16:18:49 -0800 |
commit | 256044a7893375748e0fd535f1acbd205c6dc97f (patch) | |
tree | 2036dceb24c15da57b6eed88ba58e06740420da1 | |
parent | de8f3d9c1e682fb8076f3785c8011bb1c0c2b1bd (diff) | |
download | bumble-256044a7893375748e0fd535f1acbd205c6dc97f.tar.gz |
Implement Pandora extended advertising
Support setting the PHY of Pandora scans.
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | bumble/device.py | 20 | ||||
-rw-r--r-- | bumble/pandora/host.py | 144 |
3 files changed, 156 insertions, 11 deletions
@@ -9,6 +9,9 @@ __pycache__ # generated by setuptools_scm bumble/_version.py .vscode/launch.json +.vscode/settings.json /.idea venv/ .venv/ +# snoop logs +out/ diff --git a/bumble/device.py b/bumble/device.py index 01a5cbe..48f9d58 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -2112,6 +2112,20 @@ class Device(CompositeEventEmitter): Returns: An AdvertisingSet instance. """ + # Instantiate default values + if advertising_parameters is None: + advertising_parameters = AdvertisingParameters() + + if ( + not advertising_parameters.advertising_event_properties.is_legacy + and advertising_data + and scan_response_data + ): + raise ValueError( + "Extended advertisements can't have both data and scan \ + response data" + ) + # Allocate a new handle try: advertising_handle = next( @@ -2125,10 +2139,6 @@ class Device(CompositeEventEmitter): except StopIteration as exc: raise RuntimeError("all valid advertising handles already in use") from exc - # Instantiate default values - if advertising_parameters is None: - advertising_parameters = AdvertisingParameters() - # Use the device's random address if a random address is needed but none was # provided. if ( @@ -2222,7 +2232,7 @@ class Device(CompositeEventEmitter): scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms own_address_type: int = OwnAddressType.RANDOM, filter_duplicates: bool = False, - scanning_phys: Tuple[int, int] = (HCI_LE_1M_PHY, HCI_LE_CODED_PHY), + scanning_phys: List[int] = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY], ) -> None: # Check that the arguments are legal if scan_interval < scan_window: diff --git a/bumble/pandora/host.py b/bumble/pandora/host.py index c270f67..3f48f46 100644 --- a/bumble/pandora/host.py +++ b/bumble/pandora/host.py @@ -34,8 +34,11 @@ from bumble.device import ( DEVICE_DEFAULT_SCAN_INTERVAL, DEVICE_DEFAULT_SCAN_WINDOW, Advertisement, + AdvertisingParameters, + AdvertisingEventProperties, AdvertisingType, Device, + Phy, ) from bumble.gatt import Service from bumble.hci import ( @@ -47,6 +50,7 @@ from bumble.hci import ( from google.protobuf import any_pb2 # pytype: disable=pyi-error from google.protobuf import empty_pb2 # pytype: disable=pyi-error from pandora.host_grpc_aio import HostServicer +from pandora import host_pb2 from pandora.host_pb2 import ( NOT_CONNECTABLE, NOT_DISCOVERABLE, @@ -94,6 +98,25 @@ SECONDARY_PHY_MAP: Dict[int, SecondaryPhy] = { 3: SECONDARY_CODED, } +PRIMARY_PHY_TO_BUMBLE_PHY_MAP: Dict[PrimaryPhy, Phy] = { + PRIMARY_1M: Phy.LE_1M, + PRIMARY_CODED: Phy.LE_CODED, +} + +SECONDARY_PHY_TO_BUMBLE_PHY_MAP: Dict[SecondaryPhy, Phy] = { + SECONDARY_NONE: Phy.LE_1M, + SECONDARY_1M: Phy.LE_1M, + SECONDARY_2M: Phy.LE_2M, + SECONDARY_CODED: Phy.LE_CODED, +} + +OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, bumble.hci.OwnAddressType] = { + host_pb2.PUBLIC: bumble.hci.OwnAddressType.PUBLIC, + host_pb2.RANDOM: bumble.hci.OwnAddressType.RANDOM, + host_pb2.RESOLVABLE_OR_PUBLIC: bumble.hci.OwnAddressType.RESOLVABLE_OR_PUBLIC, + host_pb2.RESOLVABLE_OR_RANDOM: bumble.hci.OwnAddressType.RESOLVABLE_OR_RANDOM, +} + class HostService(HostServicer): waited_connections: Set[int] @@ -281,10 +304,113 @@ class HostService(HostServicer): async def Advertise( self, request: AdvertiseRequest, context: grpc.ServicerContext ) -> AsyncGenerator[AdvertiseResponse, None]: - if not request.legacy: - raise NotImplementedError( - "TODO: add support for extended advertising in Bumble" + try: + if request.legacy: + async for rsp in self.legacy_advertise(request, context): + yield rsp + else: + async for rsp in self.extended_advertise(request, context): + yield rsp + finally: + pass + + async def extended_advertise( + self, request: AdvertiseRequest, context: grpc.ServicerContext + ) -> AsyncGenerator[AdvertiseResponse, None]: + advertising_data = bytes(self.unpack_data_types(request.data)) + scan_response_data = bytes(self.unpack_data_types(request.scan_response_data)) + scannable = len(scan_response_data) != 0 + + advertising_event_properties = AdvertisingEventProperties( + is_connectable=request.connectable, + is_scannable=scannable, + is_directed=request.target is not None, + is_high_duty_cycle_directed_connectable=False, + is_legacy=False, + is_anonymous=False, + include_tx_power=False, + ) + + peer_address = Address.ANY + if request.target: + # Need to reverse bytes order since Bumble Address is using MSB. + target_bytes = bytes(reversed(request.target)) + if request.target_variant() == "public": + peer_address = Address(target_bytes, Address.PUBLIC_DEVICE_ADDRESS) + else: + peer_address = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS) + + advertising_parameters = AdvertisingParameters( + advertising_event_properties=advertising_event_properties, + own_address_type=OWN_ADDRESS_MAP[request.own_address_type], + peer_address=peer_address, + primary_advertising_phy=PRIMARY_PHY_TO_BUMBLE_PHY_MAP[request.primary_phy], + secondary_advertising_phy=SECONDARY_PHY_TO_BUMBLE_PHY_MAP[ + request.secondary_phy + ], + ) + if advertising_interval := request.interval: + advertising_parameters.primary_advertising_interval_min = int( + advertising_interval ) + advertising_parameters.primary_advertising_interval_max = int( + advertising_interval + ) + if interval_range := request.interval_range: + advertising_parameters.primary_advertising_interval_max += int( + interval_range + ) + + advertising_set = await self.device.create_advertising_set( + advertising_parameters=advertising_parameters, + advertising_data=advertising_data, + scan_response_data=scan_response_data, + ) + + pending_connection: asyncio.Future[ + bumble.device.Connection + ] = asyncio.get_running_loop().create_future() + + if request.connectable: + + def on_connection(connection: bumble.device.Connection) -> None: + if ( + connection.transport == BT_LE_TRANSPORT + and connection.role == BT_PERIPHERAL_ROLE + ): + pending_connection.set_result(connection) + + self.device.on('connection', on_connection) + + try: + # Advertise until RPC is canceled + while True: + if not advertising_set.enabled: + self.log.debug('Advertise (extended)') + await advertising_set.start() + + if not request.connectable: + await asyncio.sleep(1) + continue + + connection = await pending_connection + pending_connection = asyncio.get_running_loop().create_future() + + cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big')) + yield AdvertiseResponse(connection=Connection(cookie=cookie)) + + await asyncio.sleep(1) + finally: + try: + self.log.debug('Stop Advertise (extended)') + await advertising_set.stop() + await advertising_set.remove() + except Exception: + pass + + async def legacy_advertise( + self, request: AdvertiseRequest, context: grpc.ServicerContext + ) -> AsyncGenerator[AdvertiseResponse, None]: if advertising_interval := request.interval: self.device.config.advertising_interval_min = int(advertising_interval) self.device.config.advertising_interval_max = int(advertising_interval) @@ -422,11 +548,16 @@ class HostService(HostServicer): self, request: ScanRequest, context: grpc.ServicerContext ) -> AsyncGenerator[ScanningResponse, None]: # TODO: modify `start_scanning` to accept floats instead of int for ms values - if request.phys: - raise NotImplementedError("TODO: add support for `request.phys`") - self.log.debug('Scan') + scanning_phys = [] + if PRIMARY_1M in request.phys: + scanning_phys.append(int(Phy.LE_1M)) + if PRIMARY_CODED in request.phys: + scanning_phys.append(int(Phy.LE_CODED)) + if not scanning_phys: + scanning_phys = [int(Phy.LE_1M), int(Phy.LE_CODED)] + scan_queue: asyncio.Queue[Advertisement] = asyncio.Queue() handler = self.device.on('advertisement', scan_queue.put_nowait) await self.device.start_scanning( @@ -439,6 +570,7 @@ class HostService(HostServicer): scan_window=int(request.window) if request.window else DEVICE_DEFAULT_SCAN_WINDOW, + scanning_phys=scanning_phys, ) try: |