diff options
Diffstat (limited to 'bumble/device.py')
-rw-r--r-- | bumble/device.py | 1620 |
1 files changed, 1340 insertions, 280 deletions
diff --git a/bumble/device.py b/bumble/device.py index 7f11012..48f9d58 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -21,8 +21,10 @@ import functools import json import asyncio import logging -from contextlib import asynccontextmanager, AsyncExitStack -from dataclasses import dataclass +import secrets +from contextlib import asynccontextmanager, AsyncExitStack, closing +from dataclasses import dataclass, field +from collections.abc import Iterable from typing import ( Any, Callable, @@ -32,12 +34,15 @@ from typing import ( Optional, Tuple, Type, + TypeVar, Union, cast, overload, TYPE_CHECKING, ) +from pyee import EventEmitter + from .colors import color from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU from .gatt import Characteristic, Descriptor, Service @@ -45,6 +50,7 @@ from .hci import ( HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, HCI_CENTRAL_ROLE, + HCI_PERIPHERAL_ROLE, HCI_COMMAND_STATUS_PENDING, HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, HCI_DISPLAY_YES_NO_IO_CAPABILITY, @@ -56,12 +62,8 @@ from .hci import ( HCI_LE_1M_PHY, HCI_LE_1M_PHY_BIT, HCI_LE_2M_PHY, - HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE, - HCI_LE_CLEAR_RESOLVING_LIST_COMMAND, HCI_LE_CODED_PHY, HCI_LE_CODED_PHY_BIT, - HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE, - HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE, HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND, HCI_LE_RAND_COMMAND, HCI_LE_READ_PHY_COMMAND, @@ -75,37 +77,52 @@ from .hci import ( HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, HCI_SUCCESS, HCI_WRITE_LE_HOST_SUPPORT_COMMAND, - Address, HCI_Accept_Connection_Request_Command, HCI_Authentication_Requested_Command, HCI_Command_Status_Event, HCI_Constant, HCI_Create_Connection_Cancel_Command, HCI_Create_Connection_Command, + HCI_Connection_Complete_Event, HCI_Disconnect_Command, HCI_Encryption_Change_Event, HCI_Error, HCI_IO_Capability_Request_Reply_Command, HCI_Inquiry_Cancel_Command, HCI_Inquiry_Command, + HCI_IsoDataPacket, + HCI_LE_Accept_CIS_Request_Command, HCI_LE_Add_Device_To_Resolving_List_Command, HCI_LE_Advertising_Report_Event, HCI_LE_Clear_Resolving_List_Command, HCI_LE_Connection_Update_Command, HCI_LE_Create_Connection_Cancel_Command, HCI_LE_Create_Connection_Command, + HCI_LE_Create_CIS_Command, HCI_LE_Enable_Encryption_Command, HCI_LE_Extended_Advertising_Report_Event, HCI_LE_Extended_Create_Connection_Command, HCI_LE_Rand_Command, HCI_LE_Read_PHY_Command, + HCI_LE_Read_Remote_Features_Command, + HCI_LE_Reject_CIS_Request_Command, + HCI_LE_Remove_Advertising_Set_Command, HCI_LE_Set_Address_Resolution_Enable_Command, HCI_LE_Set_Advertising_Data_Command, HCI_LE_Set_Advertising_Enable_Command, HCI_LE_Set_Advertising_Parameters_Command, + HCI_LE_Set_Advertising_Set_Random_Address_Command, + HCI_LE_Set_CIG_Parameters_Command, + HCI_LE_Set_Data_Length_Command, HCI_LE_Set_Default_PHY_Command, HCI_LE_Set_Extended_Scan_Enable_Command, HCI_LE_Set_Extended_Scan_Parameters_Command, + HCI_LE_Set_Extended_Scan_Response_Data_Command, + HCI_LE_Set_Extended_Advertising_Data_Command, + HCI_LE_Set_Extended_Advertising_Enable_Command, + HCI_LE_Set_Extended_Advertising_Parameters_Command, + HCI_LE_Set_Host_Feature_Command, + HCI_LE_Set_Periodic_Advertising_Enable_Command, HCI_LE_Set_PHY_Command, HCI_LE_Set_Random_Address_Command, HCI_LE_Set_Scan_Enable_Command, @@ -120,6 +137,7 @@ from .hci import ( HCI_Switch_Role_Command, HCI_Set_Connection_Encryption_Command, HCI_StatusError, + HCI_SynchronousDataPacket, HCI_User_Confirmation_Request_Negative_Reply_Command, HCI_User_Confirmation_Request_Reply_Command, HCI_User_Passkey_Request_Negative_Reply_Command, @@ -132,7 +150,11 @@ from .hci import ( HCI_Write_Scan_Enable_Command, HCI_Write_Secure_Connections_Host_Support_Command, HCI_Write_Simple_Pairing_Mode_Command, + Address, OwnAddressType, + LeFeature, + LeFeatureMask, + Phy, phy_list_to_bits, ) from .host import Host @@ -151,9 +173,11 @@ from .core import ( from .utils import ( AsyncRunner, CompositeEventEmitter, + EventWatcher, setup_event_forwarding, composite_listener, deprecated, + experimental, ) from .keys import ( KeyStore, @@ -188,6 +212,8 @@ DEVICE_MIN_SCAN_WINDOW = 25 DEVICE_MAX_SCAN_WINDOW = 10240 DEVICE_MIN_LE_RSSI = -127 DEVICE_MAX_LE_RSSI = 20 +DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00 +DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00' DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms @@ -211,10 +237,16 @@ DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH = 0 # ms DEVICE_DEFAULT_L2CAP_COC_MTU = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU DEVICE_DEFAULT_L2CAP_COC_MPS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS +DEVICE_DEFAULT_ADVERTISING_TX_POWER = ( + HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE +) # fmt: on # pylint: enable=line-too-long +# As specified in 7.8.56 LE Set Extended Advertising Enable command +DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION = 1.28 + # ----------------------------------------------------------------------------- # Classes @@ -222,16 +254,40 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN # ----------------------------------------------------------------------------- +@dataclass class Advertisement: + # Attributes address: Address - - TX_POWER_NOT_AVAILABLE = ( + rssi: int = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE + is_legacy: bool = False + is_anonymous: bool = False + is_connectable: bool = False + is_directed: bool = False + is_scannable: bool = False + is_scan_response: bool = False + is_complete: bool = True + is_truncated: bool = False + primary_phy: int = 0 + secondary_phy: int = 0 + tx_power: int = ( HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE ) - RSSI_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE + sid: int = 0 + data_bytes: bytes = b'' + + # Constants + TX_POWER_NOT_AVAILABLE: ClassVar[ + int + ] = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE + RSSI_NOT_AVAILABLE: ClassVar[ + int + ] = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE + + def __post_init__(self) -> None: + self.data = AdvertisingData.from_bytes(self.data_bytes) @classmethod - def from_advertising_report(cls, report): + def from_advertising_report(cls, report) -> Optional[Advertisement]: if isinstance(report, HCI_LE_Advertising_Report_Event.Report): return LegacyAdvertisement.from_advertising_report(report) @@ -240,41 +296,6 @@ class Advertisement: return None - # pylint: disable=line-too-long - def __init__( - self, - address, - rssi=HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE, - is_legacy=False, - is_anonymous=False, - is_connectable=False, - is_directed=False, - is_scannable=False, - is_scan_response=False, - is_complete=True, - is_truncated=False, - primary_phy=0, - secondary_phy=0, - tx_power=HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE, - sid=0, - data=b'', - ): - self.address = address - self.rssi = rssi - self.is_legacy = is_legacy - self.is_anonymous = is_anonymous - self.is_connectable = is_connectable - self.is_directed = is_directed - self.is_scannable = is_scannable - self.is_scan_response = is_scan_response - self.is_complete = is_complete - self.is_truncated = is_truncated - self.primary_phy = primary_phy - self.secondary_phy = secondary_phy - self.tx_power = tx_power - self.sid = sid - self.data = AdvertisingData.from_bytes(data) - # ----------------------------------------------------------------------------- class LegacyAdvertisement(Advertisement): @@ -298,7 +319,7 @@ class LegacyAdvertisement(Advertisement): ), is_scan_response=report.event_type == HCI_LE_Advertising_Report_Event.SCAN_RSP, - data=report.data, + data_bytes=report.data, ) @@ -323,7 +344,7 @@ class ExtendedAdvertisement(Advertisement): secondary_phy = report.secondary_phy, tx_power = report.tx_power, sid = report.advertising_sid, - data = report.data + data_bytes = report.data ) # fmt: on @@ -384,7 +405,7 @@ class AdvertisingType(IntEnum): # fmt: on @property - def has_data(self): + def has_data(self) -> bool: return self in ( AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, AdvertisingType.UNDIRECTED_SCANNABLE, @@ -392,7 +413,7 @@ class AdvertisingType(IntEnum): ) @property - def is_connectable(self): + def is_connectable(self) -> bool: return self in ( AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY, @@ -400,19 +421,369 @@ class AdvertisingType(IntEnum): ) @property - def is_scannable(self): + def is_scannable(self) -> bool: return self in ( AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, AdvertisingType.UNDIRECTED_SCANNABLE, ) @property - def is_directed(self): + def is_directed(self) -> bool: return self in ( AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY, AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY, ) + @property + def is_high_duty_cycle_directed_connectable(self): + return self == AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY + + +# ----------------------------------------------------------------------------- +@dataclass +class LegacyAdvertiser: + device: Device + advertising_type: AdvertisingType + own_address_type: OwnAddressType + peer_address: Address + auto_restart: bool + + async def start(self) -> None: + # Set/update the advertising data if the advertising type allows it + if self.advertising_type.has_data: + await self.device.send_command( + HCI_LE_Set_Advertising_Data_Command( + advertising_data=self.device.advertising_data + ), + check_result=True, + ) + + # Set/update the scan response data if the advertising is scannable + if self.advertising_type.is_scannable: + await self.device.send_command( + HCI_LE_Set_Scan_Response_Data_Command( + scan_response_data=self.device.scan_response_data + ), + check_result=True, + ) + + # Set the advertising parameters + await self.device.send_command( + HCI_LE_Set_Advertising_Parameters_Command( + advertising_interval_min=self.device.advertising_interval_min, + advertising_interval_max=self.device.advertising_interval_max, + advertising_type=int(self.advertising_type), + own_address_type=self.own_address_type, + peer_address_type=self.peer_address.address_type, + peer_address=self.peer_address, + advertising_channel_map=7, + advertising_filter_policy=0, + ), + check_result=True, + ) + + # Enable advertising + await self.device.send_command( + HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), + check_result=True, + ) + + async def stop(self) -> None: + # Disable advertising + await self.device.send_command( + HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), + check_result=True, + ) + + +# ----------------------------------------------------------------------------- +@dataclass +class AdvertisingEventProperties: + is_connectable: bool = True + is_scannable: bool = False + is_directed: bool = False + is_high_duty_cycle_directed_connectable: bool = False + is_legacy: bool = False + is_anonymous: bool = False + include_tx_power: bool = False + + def __int__(self) -> int: + properties = ( + HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(0) + ) + if self.is_connectable: + properties |= properties.CONNECTABLE_ADVERTISING + if self.is_scannable: + properties |= properties.SCANNABLE_ADVERTISING + if self.is_directed: + properties |= properties.DIRECTED_ADVERTISING + if self.is_high_duty_cycle_directed_connectable: + properties |= properties.HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING + if self.is_legacy: + properties |= properties.USE_LEGACY_ADVERTISING_PDUS + if self.is_anonymous: + properties |= properties.ANONYMOUS_ADVERTISING + if self.include_tx_power: + properties |= properties.INCLUDE_TX_POWER + + return int(properties) + + @classmethod + def from_advertising_type( + cls: Type[AdvertisingEventProperties], + advertising_type: AdvertisingType, + ) -> AdvertisingEventProperties: + return cls( + is_connectable=advertising_type.is_connectable, + is_scannable=advertising_type.is_scannable, + is_directed=advertising_type.is_directed, + is_high_duty_cycle_directed_connectable=advertising_type.is_high_duty_cycle_directed_connectable, + is_legacy=True, + is_anonymous=False, + include_tx_power=False, + ) + + +# ----------------------------------------------------------------------------- +# TODO: replace with typing.TypeAlias when the code base is all Python >= 3.10 +AdvertisingChannelMap = HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap + + +# ----------------------------------------------------------------------------- +@dataclass +class AdvertisingParameters: + # pylint: disable=line-too-long + advertising_event_properties: AdvertisingEventProperties = field( + default_factory=AdvertisingEventProperties + ) + primary_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL + primary_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL + primary_advertising_channel_map: HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap = ( + AdvertisingChannelMap.CHANNEL_37 + | AdvertisingChannelMap.CHANNEL_38 + | AdvertisingChannelMap.CHANNEL_39 + ) + own_address_type: OwnAddressType = OwnAddressType.RANDOM + peer_address: Address = Address.ANY + advertising_filter_policy: int = 0 + advertising_tx_power: int = DEVICE_DEFAULT_ADVERTISING_TX_POWER + primary_advertising_phy: Phy = Phy.LE_1M + secondary_advertising_max_skip: int = 0 + secondary_advertising_phy: Phy = Phy.LE_1M + advertising_sid: int = 0 + enable_scan_request_notifications: bool = False + primary_advertising_phy_options: int = 0 + secondary_advertising_phy_options: int = 0 + + +# ----------------------------------------------------------------------------- +@dataclass +class PeriodicAdvertisingParameters: + # TODO implement this class + pass + + +# ----------------------------------------------------------------------------- +@dataclass +class AdvertisingSet(EventEmitter): + device: Device + advertising_handle: int + auto_restart: bool + random_address: Optional[Address] + advertising_parameters: AdvertisingParameters + advertising_data: bytes + scan_response_data: bytes + periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] + periodic_advertising_data: bytes + selected_tx_power: int = 0 + enabled: bool = False + + def __post_init__(self) -> None: + super().__init__() + + async def set_advertising_parameters( + self, advertising_parameters: AdvertisingParameters + ) -> None: + # Compliance check + if ( + not advertising_parameters.advertising_event_properties.is_legacy + and advertising_parameters.advertising_event_properties.is_connectable + and advertising_parameters.advertising_event_properties.is_scannable + ): + logger.warning( + "non-legacy extended advertising event properties may not be both " + "connectable and scannable" + ) + + response = await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Parameters_Command( + advertising_handle=self.advertising_handle, + advertising_event_properties=int( + advertising_parameters.advertising_event_properties + ), + primary_advertising_interval_min=( + int(advertising_parameters.primary_advertising_interval_min / 0.625) + ), + primary_advertising_interval_max=( + int(advertising_parameters.primary_advertising_interval_min / 0.625) + ), + primary_advertising_channel_map=int( + advertising_parameters.primary_advertising_channel_map + ), + own_address_type=advertising_parameters.own_address_type, + peer_address_type=advertising_parameters.peer_address.address_type, + peer_address=advertising_parameters.peer_address, + advertising_tx_power=advertising_parameters.advertising_tx_power, + advertising_filter_policy=( + advertising_parameters.advertising_filter_policy + ), + primary_advertising_phy=advertising_parameters.primary_advertising_phy, + secondary_advertising_max_skip=( + advertising_parameters.secondary_advertising_max_skip + ), + secondary_advertising_phy=( + advertising_parameters.secondary_advertising_phy + ), + advertising_sid=advertising_parameters.advertising_sid, + scan_request_notification_enable=( + 1 if advertising_parameters.enable_scan_request_notifications else 0 + ), + ), + check_result=True, + ) + self.selected_tx_power = response.return_parameters.selected_tx_power + self.advertising_parameters = advertising_parameters + + async def set_advertising_data(self, advertising_data: bytes) -> None: + # pylint: disable=line-too-long + await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Data_Command( + advertising_handle=self.advertising_handle, + operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, + advertising_data=advertising_data, + ), + check_result=True, + ) + self.advertising_data = advertising_data + + async def set_scan_response_data(self, scan_response_data: bytes) -> None: + # pylint: disable=line-too-long + if ( + scan_response_data + and not self.advertising_parameters.advertising_event_properties.is_scannable + ): + logger.warning( + "ignoring attempt to set non-empty scan response data on non-scannable " + "advertising set" + ) + return + + await self.device.send_command( + HCI_LE_Set_Extended_Scan_Response_Data_Command( + advertising_handle=self.advertising_handle, + operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, + scan_response_data=scan_response_data, + ), + check_result=True, + ) + self.scan_response_data = scan_response_data + + async def set_periodic_advertising_parameters( + self, advertising_parameters: PeriodicAdvertisingParameters + ) -> None: + # TODO: send command + self.periodic_advertising_parameters = advertising_parameters + + async def set_periodic_advertising_data(self, advertising_data: bytes) -> None: + # TODO: send command + self.periodic_advertising_data = advertising_data + + async def set_random_address(self, random_address: Address) -> None: + await self.device.send_command( + HCI_LE_Set_Advertising_Set_Random_Address_Command( + advertising_handle=self.advertising_handle, + random_address=(random_address or self.device.random_address), + ), + check_result=True, + ) + + async def start( + self, duration: float = 0.0, max_advertising_events: int = 0 + ) -> None: + """ + Start advertising. + + Args: + duration: How long to advertise for, in seconds. Use 0 (the default) for + an unlimited duration, unless this advertising set is a High Duty Cycle + Directed Advertisement type. + max_advertising_events: Maximum number of events to advertise for. Use 0 + (the default) for an unlimited number of advertisements. + """ + await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Enable_Command( + enable=1, + advertising_handles=[self.advertising_handle], + durations=[round(duration * 100)], + max_extended_advertising_events=[max_advertising_events], + ), + check_result=True, + ) + self.enabled = True + + self.emit('start') + + async def start_periodic(self, include_adi: bool = False) -> None: + await self.device.send_command( + HCI_LE_Set_Periodic_Advertising_Enable_Command( + enable=1 | (2 if include_adi else 0), + advertising_handles=self.advertising_handle, + ), + check_result=True, + ) + + self.emit('start_periodic') + + async def stop(self) -> None: + await self.device.send_command( + HCI_LE_Set_Extended_Advertising_Enable_Command( + enable=0, + advertising_handles=[self.advertising_handle], + durations=[0], + max_extended_advertising_events=[0], + ), + check_result=True, + ) + self.enabled = False + + self.emit('stop') + + async def stop_periodic(self) -> None: + await self.device.send_command( + HCI_LE_Set_Periodic_Advertising_Enable_Command( + enable=0, + advertising_handles=self.advertising_handle, + ), + check_result=True, + ) + + self.emit('stop_periodic') + + async def remove(self) -> None: + await self.device.send_command( + HCI_LE_Remove_Advertising_Set_Command( + advertising_handle=self.advertising_handle + ), + check_result=True, + ) + del self.device.extended_advertising_sets[self.advertising_handle] + + def on_termination(self, status: int) -> None: + self.enabled = False + self.emit('termination', status) + # ----------------------------------------------------------------------------- class LePhyOptions: @@ -429,8 +800,11 @@ class LePhyOptions: # ----------------------------------------------------------------------------- +_PROXY_CLASS = TypeVar('_PROXY_CLASS', bound=gatt_client.ProfileServiceProxy) + + class Peer: - def __init__(self, connection): + def __init__(self, connection: Connection) -> None: self.connection = connection # Create a GATT client for the connection @@ -438,77 +812,113 @@ class Peer: connection.gatt_client = self.gatt_client @property - def services(self): + def services(self) -> List[gatt_client.ServiceProxy]: return self.gatt_client.services - async def request_mtu(self, mtu): + async def request_mtu(self, mtu: int) -> int: mtu = await self.gatt_client.request_mtu(mtu) self.connection.emit('connection_att_mtu_update') return mtu - async def discover_service(self, uuid): + async def discover_service( + self, uuid: Union[core.UUID, str] + ) -> List[gatt_client.ServiceProxy]: return await self.gatt_client.discover_service(uuid) - async def discover_services(self, uuids=()): + async def discover_services( + self, uuids: Iterable[core.UUID] = () + ) -> List[gatt_client.ServiceProxy]: return await self.gatt_client.discover_services(uuids) - async def discover_included_services(self, service): + async def discover_included_services( + self, service: gatt_client.ServiceProxy + ) -> List[gatt_client.ServiceProxy]: return await self.gatt_client.discover_included_services(service) - async def discover_characteristics(self, uuids=(), service=None): + async def discover_characteristics( + self, + uuids: Iterable[Union[core.UUID, str]] = (), + service: Optional[gatt_client.ServiceProxy] = None, + ) -> List[gatt_client.CharacteristicProxy]: return await self.gatt_client.discover_characteristics( uuids=uuids, service=service ) async def discover_descriptors( - self, characteristic=None, start_handle=None, end_handle=None + self, + characteristic: Optional[gatt_client.CharacteristicProxy] = None, + start_handle: Optional[int] = None, + end_handle: Optional[int] = None, ): return await self.gatt_client.discover_descriptors( characteristic, start_handle, end_handle ) - async def discover_attributes(self): + async def discover_attributes(self) -> List[gatt_client.AttributeProxy]: return await self.gatt_client.discover_attributes() - async def subscribe(self, characteristic, subscriber=None, prefer_notify=True): + async def subscribe( + self, + characteristic: gatt_client.CharacteristicProxy, + subscriber: Optional[Callable[[bytes], Any]] = None, + prefer_notify: bool = True, + ) -> None: return await self.gatt_client.subscribe( characteristic, subscriber, prefer_notify ) - async def unsubscribe(self, characteristic, subscriber=None): + async def unsubscribe( + self, + characteristic: gatt_client.CharacteristicProxy, + subscriber: Optional[Callable[[bytes], Any]] = None, + ) -> None: return await self.gatt_client.unsubscribe(characteristic, subscriber) - async def read_value(self, attribute): + async def read_value( + self, attribute: Union[int, gatt_client.AttributeProxy] + ) -> bytes: return await self.gatt_client.read_value(attribute) - async def write_value(self, attribute, value, with_response=False): + async def write_value( + self, + attribute: Union[int, gatt_client.AttributeProxy], + value: bytes, + with_response: bool = False, + ) -> None: return await self.gatt_client.write_value(attribute, value, with_response) - async def read_characteristics_by_uuid(self, uuid, service=None): + async def read_characteristics_by_uuid( + self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None + ) -> List[bytes]: return await self.gatt_client.read_characteristics_by_uuid(uuid, service) - def get_services_by_uuid(self, uuid): + def get_services_by_uuid(self, uuid: core.UUID) -> List[gatt_client.ServiceProxy]: return self.gatt_client.get_services_by_uuid(uuid) - def get_characteristics_by_uuid(self, uuid, service=None): + def get_characteristics_by_uuid( + self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None + ) -> List[gatt_client.CharacteristicProxy]: return self.gatt_client.get_characteristics_by_uuid(uuid, service) - def create_service_proxy(self, proxy_class): - return proxy_class.from_client(self.gatt_client) + def create_service_proxy(self, proxy_class: Type[_PROXY_CLASS]) -> _PROXY_CLASS: + return cast(_PROXY_CLASS, proxy_class.from_client(self.gatt_client)) - async def discover_service_and_create_proxy(self, proxy_class): + async def discover_service_and_create_proxy( + self, proxy_class: Type[_PROXY_CLASS] + ) -> Optional[_PROXY_CLASS]: # Discover the first matching service and its characteristics services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID) if services: service = services[0] await service.discover_characteristics() return self.create_service_proxy(proxy_class) + return None - async def sustain(self, timeout=None): + async def sustain(self, timeout: Optional[float] = None) -> None: await self.connection.sustain(timeout) # [Classic only] - async def request_name(self): + async def request_name(self) -> str: return await self.connection.request_remote_name() async def __aenter__(self): @@ -521,7 +931,7 @@ class Peer: async def __aexit__(self, exc_type, exc_value, traceback): pass - def __str__(self): + def __str__(self) -> str: return f'{self.connection.peer_address} as {self.connection.role_name}' @@ -541,6 +951,46 @@ ConnectionParametersPreferences.default = ConnectionParametersPreferences() # ----------------------------------------------------------------------------- +@dataclass +class ScoLink(CompositeEventEmitter): + device: Device + acl_connection: Connection + handle: int + link_type: int + + def __post_init__(self): + super().__init__() + + async def disconnect( + self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR + ) -> None: + await self.device.disconnect(self, reason) + + +# ----------------------------------------------------------------------------- +@dataclass +class CisLink(CompositeEventEmitter): + class State(IntEnum): + PENDING = 0 + ESTABLISHED = 1 + + device: Device + acl_connection: Connection # Based ACL connection + handle: int # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events) + cis_id: int # CIS ID assigned by Central device + cig_id: int # CIG ID assigned by Central device + state: State = State.PENDING + + def __post_init__(self): + super().__init__() + + async def disconnect( + self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR + ) -> None: + await self.device.disconnect(self, reason) + + +# ----------------------------------------------------------------------------- class Connection(CompositeEventEmitter): device: Device handle: int @@ -548,6 +998,7 @@ class Connection(CompositeEventEmitter): self_address: Address peer_address: Address peer_resolvable_address: Optional[Address] + peer_le_features: Optional[LeFeatureMask] role: int encryption: int authenticated: bool @@ -621,6 +1072,7 @@ class Connection(CompositeEventEmitter): ) # By default, use the device's shared server self.pairing_peer_io_capability = None self.pairing_peer_authentication_requirements = None + self.peer_le_features = None # [Classic only] @classmethod @@ -721,7 +1173,7 @@ class Connection(CompositeEventEmitter): async def switch_role(self, role: int) -> None: return await self.device.switch_role(self, role) - async def sustain(self, timeout=None): + async def sustain(self, timeout: Optional[float] = None) -> None: """Idles the current task waiting for a disconnect or timeout""" abort = asyncio.get_running_loop().create_future() @@ -736,6 +1188,9 @@ class Connection(CompositeEventEmitter): self.remove_listener('disconnection', abort.set_result) self.remove_listener('disconnection_failure', abort.set_exception) + async def set_data_length(self, tx_octets, tx_time) -> None: + return await self.device.set_data_length(self, tx_octets, tx_time) + async def update_parameters( self, connection_interval_min, @@ -766,6 +1221,15 @@ class Connection(CompositeEventEmitter): async def request_remote_name(self): return await self.device.request_remote_name(self) + async def get_remote_le_features(self) -> LeFeatureMask: + """[LE Only] Reads remote LE supported features. + + Returns: + LE features supported by the remote device. + """ + self.peer_le_features = await self.device.get_remote_le_features(self) + return self.peer_le_features + async def __aenter__(self): return self @@ -782,7 +1246,8 @@ class Connection(CompositeEventEmitter): return ( f'Connection(handle=0x{self.handle:04X}, ' f'role={self.role_name}, ' - f'address={self.peer_address})' + f'self_address={self.self_address}, ' + f'peer_address={self.peer_address})' ) @@ -815,6 +1280,7 @@ class DeviceConfiguration: self.keystore = None self.gatt_services: List[Dict[str, Any]] = [] self.address_resolution_offload = False + self.cis_enabled = False def load_from_dict(self, config: Dict[str, Any]) -> None: # Load simple properties @@ -850,17 +1316,21 @@ class DeviceConfiguration: self.address_resolution_offload = config.get( 'address_resolution_offload', self.address_resolution_offload ) + self.cis_enabled = config.get('cis_enabled', self.cis_enabled) # Load or synthesize an IRK irk = config.get('irk') if irk: self.irk = bytes.fromhex(irk) - else: + elif self.address != Address(DEVICE_DEFAULT_ADDRESS): # Construct an IRK from the address bytes # NOTE: this is not secure, but will always give the same IRK for the same # address address_bytes = bytes(self.address) self.irk = (address_bytes * 3)[:16] + else: + # Fallback - when both IRK and address are not set, randomly generate an IRK. + self.irk = secrets.token_bytes(16) # Load advertising data advertising_data = config.get('advertising_data') @@ -890,7 +1360,7 @@ def with_connection_from_handle(function): @functools.wraps(function) def wrapper(self, connection_handle, *args, **kwargs): if (connection := self.lookup_connection(connection_handle)) is None: - raise ValueError(f"no connection for handle: 0x{connection_handle:04x}") + raise ValueError(f'no connection for handle: 0x{connection_handle:04x}') return function(self, connection, *args, **kwargs) return wrapper @@ -956,6 +1426,10 @@ class Device(CompositeEventEmitter): ] advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] config: DeviceConfiguration + legacy_advertiser: Optional[LegacyAdvertiser] + sco_links: Dict[int, ScoLink] + cis_links: Dict[int, CisLink] + _pending_cis: Dict[int, Tuple[int, int]] @composite_listener class Listener: @@ -1030,10 +1504,7 @@ class Device(CompositeEventEmitter): self._host = None self.powered_on = False - self.advertising = False - self.advertising_type = None self.auto_restart_inquiry = True - self.auto_restart_advertising = False self.command_timeout = 10 # seconds self.gatt_server = gatt_server.Server(self) self.sdp_server = sdp.Server(self) @@ -1048,6 +1519,9 @@ class Device(CompositeEventEmitter): self.disconnecting = False self.connections = {} # Connections, by connection handle self.pending_connections = {} # Connections, by BD address (BR/EDR only) + self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only) + self.cis_links = {} # CisLinks, by connection handle (LE only) + self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle self.classic_enabled = False self.inquiry_response = None self.address_resolver = None @@ -1056,7 +1530,6 @@ class Device(CompositeEventEmitter): } # Futures, by BD address OR [Futures] for Address.ANY # Own address type cache - self.advertising_own_address_type = None self.connect_own_address_type = None # Use the initial config or a default @@ -1067,15 +1540,12 @@ class Device(CompositeEventEmitter): self.name = config.name self.random_address = config.address self.class_of_device = config.class_of_device - self.scan_response_data = config.scan_response_data - self.advertising_data = config.advertising_data - self.advertising_interval_min = config.advertising_interval_min - self.advertising_interval_max = config.advertising_interval_max self.keystore = None self.irk = config.irk self.le_enabled = config.le_enabled self.classic_enabled = config.classic_enabled self.le_simultaneous_enabled = config.le_simultaneous_enabled + self.cis_enabled = config.cis_enabled self.classic_sc_enabled = config.classic_sc_enabled self.classic_ssp_enabled = config.classic_ssp_enabled self.classic_smp_enabled = config.classic_smp_enabled @@ -1084,6 +1554,22 @@ class Device(CompositeEventEmitter): self.classic_accept_any = config.classic_accept_any self.address_resolution_offload = config.address_resolution_offload + # Extended advertising. + self.extended_advertising_sets: Dict[int, AdvertisingSet] = {} + + # Legacy advertising. + # The advertising and scan response data, as well as the advertising interval + # values are stored as properties of this object for convenience so that they + # can be initialized from a config object, and for backward compatibility for + # client code that may set those values directly before calling + # start_advertising(). + self.legacy_advertising_set: Optional[AdvertisingSet] = None + self.legacy_advertiser: Optional[LegacyAdvertiser] = None + self.advertising_data = config.advertising_data + self.scan_response_data = config.scan_response_data + self.advertising_interval_min = config.advertising_interval_min + self.advertising_interval_max = config.advertising_interval_max + for service in config.gatt_services: characteristics = [] for characteristic in service.get("characteristics", []): @@ -1092,7 +1578,8 @@ class Device(CompositeEventEmitter): # Leave this check until 5/25/2023 if descriptor.get("permission", False): raise Exception( - "Error parsing Device Config's GATT Services. The key 'permission' must be renamed to 'permissions'" + "Error parsing Device Config's GATT Services. " + "The key 'permission' must be renamed to 'permissions'" ) new_descriptor = Descriptor( attribute_type=descriptor["descriptor_type"], @@ -1308,7 +1795,7 @@ class Device(CompositeEventEmitter): self.host.send_command(command, check_result), self.command_timeout ) except asyncio.TimeoutError as error: - logger.warning('!!! Command timed out') + logger.warning(f'!!! Command {command.name} timed out') raise CommandTimeoutError() from error async def power_on(self) -> None: @@ -1316,7 +1803,7 @@ class Device(CompositeEventEmitter): await self.host.reset() # Try to get the public address from the controller - response = await self.send_command(HCI_Read_BD_ADDR_Command()) # type: ignore[call-arg] + response = await self.send_command(HCI_Read_BD_ADDR_Command()) if response.return_parameters.status == HCI_SUCCESS: logger.debug( color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow') @@ -1339,7 +1826,7 @@ class Device(CompositeEventEmitter): HCI_Write_LE_Host_Support_Command( le_supported_host=int(self.le_enabled), simultaneous_le_host=int(self.le_simultaneous_enabled), - ) # type: ignore[call-arg] + ) ) if self.le_enabled: @@ -1349,7 +1836,7 @@ class Device(CompositeEventEmitter): if self.host.supports_command(HCI_LE_RAND_COMMAND): # Get 8 random bytes response = await self.send_command( - HCI_LE_Rand_Command(), check_result=True # type: ignore[call-arg] + HCI_LE_Rand_Command(), check_result=True ) # Ensure the address bytes can be a static random address @@ -1370,7 +1857,7 @@ class Device(CompositeEventEmitter): await self.send_command( HCI_LE_Set_Random_Address_Command( random_address=self.random_address - ), # type: ignore[call-arg] + ), check_result=True, ) @@ -1383,25 +1870,33 @@ class Device(CompositeEventEmitter): await self.send_command( HCI_LE_Set_Address_Resolution_Enable_Command( address_resolution_enable=1 - ) # type: ignore[call-arg] + ) + ) + + if self.cis_enabled: + await self.send_command( + HCI_LE_Set_Host_Feature_Command( + bit_number=LeFeature.CONNECTED_ISOCHRONOUS_STREAM, + bit_value=1, + ) ) if self.classic_enabled: await self.send_command( - HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg] + HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) ) await self.send_command( - HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device) # type: ignore[call-arg] + HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device) ) await self.send_command( HCI_Write_Simple_Pairing_Mode_Command( simple_pairing_mode=int(self.classic_ssp_enabled) - ) # type: ignore[call-arg] + ) ) await self.send_command( HCI_Write_Secure_Connections_Host_Support_Command( secure_connections_host_support=int(self.classic_sc_enabled) - ) # type: ignore[call-arg] + ) ) await self.set_connectable(self.connectable) await self.set_discoverable(self.discoverable) @@ -1409,6 +1904,9 @@ class Device(CompositeEventEmitter): # Done self.powered_on = True + async def reset(self) -> None: + await self.host.reset() + async def power_off(self) -> None: if self.powered_on: await self.host.flush() @@ -1422,7 +1920,17 @@ class Device(CompositeEventEmitter): self.address_resolver = smp.AddressResolver(resolving_keys) if self.address_resolution_offload: - await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg] + await self.send_command(HCI_LE_Clear_Resolving_List_Command()) + + # Add an empty entry for non-directed address generation. + await self.send_command( + HCI_LE_Add_Device_To_Resolving_List_Command( + peer_identity_address_type=Address.ANY.address_type, + peer_identity_address=Address.ANY, + peer_irk=bytes(16), + local_irk=self.irk, + ) + ) for irk, address in resolving_keys: await self.send_command( @@ -1431,24 +1939,28 @@ class Device(CompositeEventEmitter): peer_identity_address=address, peer_irk=irk, local_irk=self.irk, - ) # type: ignore[call-arg] + ) ) - def supports_le_feature(self, feature): - return self.host.supports_le_feature(feature) + def supports_le_features(self, feature: LeFeatureMask) -> bool: + return self.host.supports_le_features(feature) def supports_le_phy(self, phy): if phy == HCI_LE_1M_PHY: return True feature_map = { - HCI_LE_2M_PHY: HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE, - HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE, + HCI_LE_2M_PHY: LeFeatureMask.LE_2M_PHY, + HCI_LE_CODED_PHY: LeFeatureMask.LE_CODED_PHY, } if phy not in feature_map: raise ValueError('invalid PHY') - return self.host.supports_le_feature(feature_map[phy]) + return self.supports_le_features(feature_map[phy]) + + @property + def supports_le_extended_advertising(self): + return self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING) async def start_advertising( self, @@ -1456,82 +1968,261 @@ class Device(CompositeEventEmitter): target: Optional[Address] = None, own_address_type: int = OwnAddressType.RANDOM, auto_restart: bool = False, + advertising_data: Optional[bytes] = None, + scan_response_data: Optional[bytes] = None, + advertising_interval_min: Optional[int] = None, + advertising_interval_max: Optional[int] = None, ) -> None: - # If we're advertising, stop first - if self.advertising: - await self.stop_advertising() - - # Set/update the advertising data if the advertising type allows it - if advertising_type.has_data: - await self.send_command( - HCI_LE_Set_Advertising_Data_Command( - advertising_data=self.advertising_data - ), # type: ignore[call-arg] - check_result=True, - ) - - # Set/update the scan response data if the advertising is scannable - if advertising_type.is_scannable: - await self.send_command( - HCI_LE_Set_Scan_Response_Data_Command( - scan_response_data=self.scan_response_data - ), # type: ignore[call-arg] - check_result=True, - ) + """Start legacy advertising. + + If the controller supports it, extended advertising commands with legacy PDUs + will be used to advertise. If not, legacy advertising commands will be used. + + Args: + advertising_type: + Type of advertising events. + target: + Peer address for directed advertising target. + (Ignored if `advertising_type` is not directed) + own_address_type: + Own address type to use in the advertising. + auto_restart: + Whether the advertisement will be restarted after disconnection. + advertising_data: + Raw advertising data. If None, the value of the property + self.advertising_data will be used. + scan_response_data: + Raw scan response. If None, the value of the property + self.scan_response_data will be used. + advertising_interval_min: + Minimum advertising interval, in milliseconds. If None, the value of the + property self.advertising_interval_min will be used. + advertising_interval_max: + Maximum advertising interval, in milliseconds. If None, the value of the + property self.advertising_interval_max will be used. + """ + # Update backing properties. + if advertising_data is not None: + self.advertising_data = advertising_data + if scan_response_data is not None: + self.scan_response_data = scan_response_data + if advertising_interval_min is not None: + self.advertising_interval_min = advertising_interval_min + if advertising_interval_max is not None: + self.advertising_interval_max = advertising_interval_max # Decide what peer address to use if advertising_type.is_directed: if target is None: - raise ValueError('directed advertising requires a target address') - + raise ValueError('directed advertising requires a target') peer_address = target - peer_address_type = target.address_type else: - peer_address = Address('00:00:00:00:00:00') - peer_address_type = Address.PUBLIC_DEVICE_ADDRESS - - # Set the advertising parameters - await self.send_command( - HCI_LE_Set_Advertising_Parameters_Command( - advertising_interval_min=self.advertising_interval_min, - advertising_interval_max=self.advertising_interval_max, - advertising_type=int(advertising_type), - own_address_type=own_address_type, - peer_address_type=peer_address_type, + peer_address = Address.ANY + + # If we're already advertising, stop now because we'll be re-creating + # a new advertiser or advertising set. + await self.stop_advertising() + assert self.legacy_advertiser is None + assert self.legacy_advertising_set is None + + if self.supports_le_extended_advertising: + # Use extended advertising commands with legacy PDUs. + self.legacy_advertising_set = await self.create_advertising_set( + auto_start=True, + auto_restart=auto_restart, + random_address=self.random_address, + advertising_parameters=AdvertisingParameters( + advertising_event_properties=( + AdvertisingEventProperties.from_advertising_type( + advertising_type + ) + ), + primary_advertising_interval_min=self.advertising_interval_min, + primary_advertising_interval_max=self.advertising_interval_max, + own_address_type=OwnAddressType(own_address_type), + peer_address=peer_address, + ), + advertising_data=( + self.advertising_data if advertising_type.has_data else b'' + ), + scan_response_data=( + self.scan_response_data if advertising_type.is_scannable else b'' + ), + ) + else: + # Use legacy commands. + self.legacy_advertiser = LegacyAdvertiser( + device=self, + advertising_type=advertising_type, + own_address_type=OwnAddressType(own_address_type), peer_address=peer_address, - advertising_channel_map=7, - advertising_filter_policy=0, - ), # type: ignore[call-arg] - check_result=True, - ) - - # Enable advertising - await self.send_command( - HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), # type: ignore[call-arg] - check_result=True, - ) + auto_restart=auto_restart, + ) - self.advertising_type = advertising_type - self.advertising_own_address_type = own_address_type - self.advertising = True - self.auto_restart_advertising = auto_restart + await self.legacy_advertiser.start() async def stop_advertising(self) -> None: + """Stop legacy advertising.""" # Disable advertising - if self.advertising: + if self.legacy_advertising_set: + if self.legacy_advertising_set.enabled: + await self.legacy_advertising_set.stop() + await self.legacy_advertising_set.remove() + self.legacy_advertising_set = None + elif self.legacy_advertiser: + await self.legacy_advertiser.stop() + self.legacy_advertiser = None + + async def create_advertising_set( + self, + advertising_parameters: Optional[AdvertisingParameters] = None, + random_address: Optional[Address] = None, + advertising_data: bytes = b'', + scan_response_data: bytes = b'', + periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] = None, + periodic_advertising_data: bytes = b'', + auto_start: bool = True, + auto_restart: bool = False, + ) -> AdvertisingSet: + """ + Create an advertising set. + + This method allows the creation of advertising sets for controllers that + support extended advertising. + + Args: + advertising_parameters: + The parameters to use for this set. If None, default parameters are used. + random_address: + The random address to use (only relevant when the parameters specify that + own_address_type is random). + advertising_data: + Initial value for the set's advertising data. + scan_response_data: + Initial value for the set's scan response data. + periodic_advertising_parameters: + The parameters to use for periodic advertising (if needed). + periodic_advertising_data: + Initial value for the set's periodic advertising data. + auto_start: + True if the set should be automatically started upon creation. + auto_restart: + True if the set should be automatically restated after a disconnection. + + Returns: + An AdvertisingSet instance. + """ + # Instantiate default values + if advertising_parameters is None: + advertising_parameters = AdvertisingParameters() + + if ( + not advertising_parameters.advertising_event_properties.is_legacy + and advertising_data + and scan_response_data + ): + raise ValueError( + "Extended advertisements can't have both data and scan \ + response data" + ) + + # Allocate a new handle + try: + advertising_handle = next( + handle + for handle in range( + DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, + DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, + ) + if handle not in self.extended_advertising_sets + ) + except StopIteration as exc: + raise RuntimeError("all valid advertising handles already in use") from exc + + # Use the device's random address if a random address is needed but none was + # provided. + if ( + advertising_parameters.own_address_type + in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) + and random_address is None + ): + random_address = self.random_address + + # Create the object that represents the set. + advertising_set = AdvertisingSet( + device=self, + advertising_handle=advertising_handle, + auto_restart=auto_restart, + random_address=random_address, + advertising_parameters=advertising_parameters, + advertising_data=advertising_data, + scan_response_data=scan_response_data, + periodic_advertising_parameters=periodic_advertising_parameters, + periodic_advertising_data=periodic_advertising_data, + ) + + # Create the set in the controller. + await advertising_set.set_advertising_parameters(advertising_parameters) + + # Update the set in the controller. + try: + if random_address: + await advertising_set.set_random_address(random_address) + + if advertising_data: + await advertising_set.set_advertising_data(advertising_data) + + if scan_response_data: + await advertising_set.set_scan_response_data(scan_response_data) + + if periodic_advertising_parameters: + # TODO: call LE Set Periodic Advertising Parameters command + raise NotImplementedError('periodic advertising not yet supported') + + if periodic_advertising_data: + # TODO: call LE Set Periodic Advertising Data command + raise NotImplementedError('periodic advertising not yet supported') + + except HCI_Error as error: + # Remove the advertising set so that it doesn't stay dangling in the + # controller. await self.send_command( - HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), # type: ignore[call-arg] - check_result=True, + HCI_LE_Remove_Advertising_Set_Command( + advertising_handle=advertising_data + ), + check_result=False, ) + raise error + + # Remember the set. + self.extended_advertising_sets[advertising_handle] = advertising_set - self.advertising_type = None - self.advertising_own_address_type = None - self.advertising = False - self.auto_restart_advertising = False + # Try to start the set if requested. + if auto_start: + try: + # pylint: disable=line-too-long + duration = ( + DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION + if advertising_parameters.advertising_event_properties.is_high_duty_cycle_directed_connectable + else 0 + ) + await advertising_set.start(duration=duration) + except Exception as error: + logger.exception(f'failed to start advertising set: {error}') + await advertising_set.remove() + raise + + return advertising_set @property def is_advertising(self): - return self.advertising + if self.legacy_advertiser: + return True + + return any( + advertising_set.enabled + for advertising_set in self.extended_advertising_sets.values() + ) async def start_scanning( self, @@ -1541,7 +2232,7 @@ class Device(CompositeEventEmitter): scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms own_address_type: int = OwnAddressType.RANDOM, filter_duplicates: bool = False, - scanning_phys: Tuple[int, int] = (HCI_LE_1M_PHY, HCI_LE_CODED_PHY), + scanning_phys: List[int] = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY], ) -> None: # Check that the arguments are legal if scan_interval < scan_window: @@ -1558,9 +2249,7 @@ class Device(CompositeEventEmitter): self.advertisement_accumulators = {} # Enable scanning - if not legacy and self.supports_le_feature( - HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE - ): + if not legacy and self.supports_le_extended_advertising: # Set the scanning parameters scan_type = ( HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING @@ -1577,7 +2266,7 @@ class Device(CompositeEventEmitter): scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT scanning_phy_count += 1 if HCI_LE_CODED_PHY in scanning_phys: - if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE): + if self.supports_le_features(LeFeatureMask.LE_CODED_PHY): scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT scanning_phy_count += 1 @@ -1592,7 +2281,7 @@ class Device(CompositeEventEmitter): scan_types=[scan_type] * scanning_phy_count, scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count, scan_windows=[int(scan_window / 0.625)] * scanning_phy_count, - ), # type: ignore[call-arg] + ), check_result=True, ) @@ -1603,7 +2292,7 @@ class Device(CompositeEventEmitter): filter_duplicates=1 if filter_duplicates else 0, duration=0, # TODO allow other values period=0, # TODO allow other values - ), # type: ignore[call-arg] + ), check_result=True, ) else: @@ -1621,7 +2310,7 @@ class Device(CompositeEventEmitter): le_scan_window=int(scan_window / 0.625), own_address_type=own_address_type, scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY, - ), # type: ignore[call-arg] + ), check_result=True, ) @@ -1629,25 +2318,25 @@ class Device(CompositeEventEmitter): await self.send_command( HCI_LE_Set_Scan_Enable_Command( le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0 - ), # type: ignore[call-arg] + ), check_result=True, ) self.scanning_is_passive = not active self.scanning = True - async def stop_scanning(self) -> None: + async def stop_scanning(self, legacy: bool = False) -> None: # Disable scanning - if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE): + if not legacy and self.supports_le_extended_advertising: await self.send_command( HCI_LE_Set_Extended_Scan_Enable_Command( enable=0, filter_duplicates=0, duration=0, period=0 - ), # type: ignore[call-arg] + ), check_result=True, ) else: await self.send_command( - HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0), # type: ignore[call-arg] + HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0), check_result=True, ) @@ -1667,7 +2356,7 @@ class Device(CompositeEventEmitter): async def start_discovery(self, auto_restart: bool = True) -> None: await self.send_command( - HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE), # type: ignore[call-arg] + HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE), check_result=True, ) @@ -1676,7 +2365,7 @@ class Device(CompositeEventEmitter): lap=HCI_GENERAL_INQUIRY_LAP, inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH, num_responses=0, # Unlimited number of responses. - ) # type: ignore[call-arg] + ) ) if response.status != HCI_Command_Status_Event.PENDING: self.discovering = False @@ -1687,7 +2376,7 @@ class Device(CompositeEventEmitter): async def stop_discovery(self) -> None: if self.discovering: - await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True) # type: ignore[call-arg] + await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True) self.auto_restart_inquiry = True self.discovering = False @@ -1735,7 +2424,7 @@ class Device(CompositeEventEmitter): await self.send_command( HCI_Write_Extended_Inquiry_Response_Command( fec_required=0, extended_inquiry_response=self.inquiry_response - ), # type: ignore[call-arg] + ), check_result=True, ) await self.set_scan_enable( @@ -1924,7 +2613,7 @@ class Device(CompositeEventEmitter): supervision_timeouts=supervision_timeouts, min_ce_lengths=min_ce_lengths, max_ce_lengths=max_ce_lengths, - ) # type: ignore[call-arg] + ) ) else: if HCI_LE_1M_PHY not in connection_parameters_preferences: @@ -1953,7 +2642,7 @@ class Device(CompositeEventEmitter): supervision_timeout=int(prefs.supervision_timeout / 10), min_ce_length=int(prefs.min_ce_length / 0.625), max_ce_length=int(prefs.max_ce_length / 0.625), - ) # type: ignore[call-arg] + ) ) else: # Save pending connection @@ -1970,7 +2659,7 @@ class Device(CompositeEventEmitter): clock_offset=0x0000, allow_role_switch=0x01, reserved=0, - ) # type: ignore[call-arg] + ) ) if result.status != HCI_Command_Status_Event.PENDING: @@ -1989,10 +2678,10 @@ class Device(CompositeEventEmitter): ) except asyncio.TimeoutError: if transport == BT_LE_TRANSPORT: - await self.send_command(HCI_LE_Create_Connection_Cancel_Command()) # type: ignore[call-arg] + await self.send_command(HCI_LE_Create_Connection_Cancel_Command()) else: await self.send_command( - HCI_Create_Connection_Cancel_Command(bd_addr=peer_address) # type: ignore[call-arg] + HCI_Create_Connection_Cancel_Command(bd_addr=peer_address) ) try: @@ -2106,7 +2795,7 @@ class Device(CompositeEventEmitter): try: # Accept connection request await self.send_command( - HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role) # type: ignore[call-arg] + HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role) ) # Wait for connection complete @@ -2163,7 +2852,9 @@ class Device(CompositeEventEmitter): check_result=True, ) - async def disconnect(self, connection, reason): + async def disconnect( + self, connection: Union[Connection, ScoLink, CisLink], reason: int + ) -> None: # Create a future so that we can wait for the disconnection's result pending_disconnection = asyncio.get_running_loop().create_future() connection.on('disconnection', pending_disconnection.set_result) @@ -2190,6 +2881,22 @@ class Device(CompositeEventEmitter): ) self.disconnecting = False + async def set_data_length(self, connection, tx_octets, tx_time) -> None: + if tx_octets < 0x001B or tx_octets > 0x00FB: + raise ValueError('tx_octets must be between 0x001B and 0x00FB') + + if tx_time < 0x0148 or tx_time > 0x4290: + raise ValueError('tx_time must be between 0x0148 and 0x4290') + + return await self.send_command( + HCI_LE_Set_Data_Length_Command( + connection_handle=connection.handle, + tx_octets=tx_octets, + tx_time=tx_time, + ), + check_result=True, + ) + async def update_connection_parameters( self, connection, @@ -2232,7 +2939,7 @@ class Device(CompositeEventEmitter): supervision_timeout=supervision_timeout, min_ce_length=min_ce_length, max_ce_length=max_ce_length, - ) # type: ignore[call-arg] + ) ) if result.status != HCI_Command_Status_Event.PENDING: raise HCI_StatusError(result) @@ -2560,7 +3267,7 @@ class Device(CompositeEventEmitter): try: result = await self.send_command( - HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role) # type: ignore[call-arg] + HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role) ) if result.status != HCI_COMMAND_STATUS_PENDING: logger.warning( @@ -2602,7 +3309,7 @@ class Device(CompositeEventEmitter): page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2, reserved=0, clock_offset=0, # TODO investigate non-0 values - ) # type: ignore[call-arg] + ) ) if result.status != HCI_COMMAND_STATUS_PENDING: @@ -2618,6 +3325,172 @@ class Device(CompositeEventEmitter): self.remove_listener('remote_name', handler) self.remove_listener('remote_name_failure', failure_handler) + # [LE only] + @experimental('Only for testing.') + async def setup_cig( + self, + cig_id: int, + cis_id: List[int], + sdu_interval: Tuple[int, int], + framing: int, + max_sdu: Tuple[int, int], + retransmission_number: int, + max_transport_latency: Tuple[int, int], + ) -> List[int]: + """Sends HCI_LE_Set_CIG_Parameters_Command. + + Args: + cig_id: CIG_ID. + cis_id: CID ID list. + sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental). + framing: Un-framing(0) or Framing(1). + max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental). + retransmission_number: retransmission_number. + max_transport_latency: Max transport latencies of + (Central->Peripheral, Peripheral->Cental). + + Returns: + List of created CIS handles corresponding to the same order of [cid_id]. + """ + num_cis = len(cis_id) + + response = await self.send_command( + HCI_LE_Set_CIG_Parameters_Command( + cig_id=cig_id, + sdu_interval_c_to_p=sdu_interval[0], + sdu_interval_p_to_c=sdu_interval[1], + worst_case_sca=0x00, # 251-500 ppm + packing=0x00, # Sequential + framing=framing, + max_transport_latency_c_to_p=max_transport_latency[0], + max_transport_latency_p_to_c=max_transport_latency[1], + cis_id=cis_id, + max_sdu_c_to_p=[max_sdu[0]] * num_cis, + max_sdu_p_to_c=[max_sdu[1]] * num_cis, + phy_c_to_p=[HCI_LE_2M_PHY] * num_cis, + phy_p_to_c=[HCI_LE_2M_PHY] * num_cis, + rtn_c_to_p=[retransmission_number] * num_cis, + rtn_p_to_c=[retransmission_number] * num_cis, + ), + check_result=True, + ) + + # Ideally, we should manage CIG lifecycle, but they are not useful for Unicast + # Server, so here it only provides a basic functionality for testing. + cis_handles = response.return_parameters.connection_handle[:] + for id, cis_handle in zip(cis_id, cis_handles): + self._pending_cis[cis_handle] = (id, cig_id) + + return cis_handles + + # [LE only] + @experimental('Only for testing.') + async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> List[CisLink]: + for cis_handle, acl_handle in cis_acl_pairs: + acl_connection = self.lookup_connection(acl_handle) + assert acl_connection + cis_id, cig_id = self._pending_cis.pop(cis_handle) + self.cis_links[cis_handle] = CisLink( + device=self, + acl_connection=acl_connection, + handle=cis_handle, + cis_id=cis_id, + cig_id=cig_id, + ) + + with closing(EventWatcher()) as watcher: + pending_cis_establishments = { + cis_handle: asyncio.get_running_loop().create_future() + for cis_handle, _ in cis_acl_pairs + } + + @watcher.on(self, 'cis_establishment') + def on_cis_establishment(cis_link: CisLink) -> None: + if pending_future := pending_cis_establishments.get(cis_link.handle): + pending_future.set_result(cis_link) + + result = await self.send_command( + HCI_LE_Create_CIS_Command( + cis_connection_handle=[p[0] for p in cis_acl_pairs], + acl_connection_handle=[p[1] for p in cis_acl_pairs], + ), + ) + if result.status != HCI_COMMAND_STATUS_PENDING: + logger.warning( + 'HCI_LE_Create_CIS_Command failed: ' + f'{HCI_Constant.error_name(result.status)}' + ) + raise HCI_StatusError(result) + + return await asyncio.gather(*pending_cis_establishments.values()) + + # [LE only] + @experimental('Only for testing.') + async def accept_cis_request(self, handle: int) -> CisLink: + result = await self.send_command( + HCI_LE_Accept_CIS_Request_Command(connection_handle=handle), + ) + if result.status != HCI_COMMAND_STATUS_PENDING: + logger.warning( + 'HCI_LE_Accept_CIS_Request_Command failed: ' + f'{HCI_Constant.error_name(result.status)}' + ) + raise HCI_StatusError(result) + + pending_cis_establishment = asyncio.get_running_loop().create_future() + + with closing(EventWatcher()) as watcher: + + @watcher.on(self, 'cis_establishment') + def on_cis_establishment(cis_link: CisLink) -> None: + if cis_link.handle == handle: + pending_cis_establishment.set_result(cis_link) + + return await pending_cis_establishment + + # [LE only] + @experimental('Only for testing.') + async def reject_cis_request( + self, + handle: int, + reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, + ) -> None: + result = await self.send_command( + HCI_LE_Reject_CIS_Request_Command(connection_handle=handle, reason=reason), + ) + if result.status != HCI_COMMAND_STATUS_PENDING: + logger.warning( + 'HCI_LE_Reject_CIS_Request_Command failed: ' + f'{HCI_Constant.error_name(result.status)}' + ) + raise HCI_StatusError(result) + + async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask: + """[LE Only] Reads remote LE supported features. + + Args: + handle: connection handle to read LE features. + + Returns: + LE features supported by the remote device. + """ + with closing(EventWatcher()) as watcher: + read_feature_future: asyncio.Future[ + LeFeatureMask + ] = asyncio.get_running_loop().create_future() + + def on_le_remote_features(handle: int, features: int): + if handle == connection.handle: + read_feature_future.set_result(LeFeatureMask(features)) + + watcher.on(self.host, 'le_remote_features', on_le_remote_features) + await self.send_command( + HCI_LE_Read_Remote_Features_Command( + connection_handle=connection.handle + ), + ) + return await read_feature_future + @host_event_handler def on_flush(self): self.emit('flush') @@ -2670,6 +3543,74 @@ class Device(CompositeEventEmitter): await self.gatt_server.indicate_subscribers(attribute, value, force) @host_event_handler + def on_advertising_set_termination( + self, + status, + advertising_handle, + connection_handle, + number_of_completed_extended_advertising_events, + ): + # Legacy advertising set is also one of extended advertising sets. + if not ( + advertising_set := self.extended_advertising_sets.get(advertising_handle) + ): + logger.warning(f'advertising set {advertising_handle} not found') + return + + advertising_set.on_termination(status) + + if status != HCI_SUCCESS: + logger.debug( + f'advertising set {advertising_handle} ' + f'terminated with status {status}' + ) + return + + if not (connection := self.lookup_connection(connection_handle)): + logger.warning(f'no connection for handle 0x{connection_handle:04x}') + return + + # Update the connection address. + connection.self_address = ( + advertising_set.random_address + if advertising_set.advertising_parameters.own_address_type + in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) + else self.public_address + ) + + # Setup auto-restart of the advertising set if needed. + if advertising_set.auto_restart: + connection.once( + 'disconnection', + lambda _: self.abort_on('flush', advertising_set.start()), + ) + + self._emit_le_connection(connection) + + def _emit_le_connection(self, connection: Connection) -> None: + # If supported, read which PHY we're connected with before + # notifying listeners of the new connection. + if self.host.supports_command(HCI_LE_READ_PHY_COMMAND): + + async def read_phy(): + result = await self.send_command( + HCI_LE_Read_PHY_Command(connection_handle=connection.handle), + check_result=True, + ) + connection.phy = ConnectionPHY( + result.return_parameters.tx_phy, result.return_parameters.rx_phy + ) + # Emit an event to notify listeners of the new connection + self.emit('connection', connection) + + # Do so asynchronously to not block the current event handler + connection.abort_on('disconnection', read_phy()) + + return + + self.emit('connection', connection) + + @host_event_handler def on_connection( self, connection_handle, @@ -2687,8 +3628,6 @@ class Device(CompositeEventEmitter): 'new connection reuses the same handle as a previous connection' ) - peer_resolvable_address = None - if transport == BT_BR_EDR_TRANSPORT: # Create a new connection connection = self.pending_connections.pop(peer_address) @@ -2697,70 +3636,76 @@ class Device(CompositeEventEmitter): # Emit an event to notify listeners of the new connection self.emit('connection', connection) - else: - # Resolve the peer address if we can - if self.address_resolver: - if peer_address.is_resolvable: - resolved_address = self.address_resolver.resolve(peer_address) - if resolved_address is not None: - logger.debug(f'*** Address resolved as {resolved_address}') - peer_resolvable_address = peer_address - peer_address = resolved_address - - # Guess which own address type is used for this connection. - # This logic is somewhat correct but may need to be improved - # when multiple advertising are run simultaneously. - if self.connect_own_address_type is not None: - own_address_type = self.connect_own_address_type - else: - own_address_type = self.advertising_own_address_type - # We are no longer advertising - self.advertising = False + return - if own_address_type in ( - OwnAddressType.PUBLIC, - OwnAddressType.RESOLVABLE_OR_PUBLIC, - ): - self_address = self.public_address + # Resolve the peer address if we can + peer_resolvable_address = None + if self.address_resolver: + if peer_address.is_resolvable: + resolved_address = self.address_resolver.resolve(peer_address) + if resolved_address is not None: + logger.debug(f'*** Address resolved as {resolved_address}') + peer_resolvable_address = peer_address + peer_address = resolved_address + + self_address = None + if role == HCI_CENTRAL_ROLE: + own_address_type = self.connect_own_address_type + assert own_address_type is not None + else: + if self.supports_le_extended_advertising: + # We'll know the address when the advertising set terminates, + # Use a temporary placeholder value for self_address. + self_address = Address.ANY_RANDOM else: - self_address = self.random_address - - # Create a new connection - connection = Connection( - self, - connection_handle, - transport, - self_address, - peer_address, - peer_resolvable_address, - role, - connection_parameters, - ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), + # We were connected via a legacy advertisement. + if self.legacy_advertiser: + own_address_type = self.legacy_advertiser.own_address_type + self.legacy_advertiser = None + else: + # This should not happen, but just in case, pick a default. + logger.warning("connection without an advertiser") + self_address = self.random_address + + if self_address is None: + self_address = ( + self.public_address + if own_address_type + in ( + OwnAddressType.PUBLIC, + OwnAddressType.RESOLVABLE_OR_PUBLIC, + ) + else self.random_address ) - self.connections[connection_handle] = connection - - # If supported, read which PHY we're connected with before - # notifying listeners of the new connection. - if self.host.supports_command(HCI_LE_READ_PHY_COMMAND): - async def read_phy(): - result = await self.send_command( - HCI_LE_Read_PHY_Command(connection_handle=connection_handle), - check_result=True, - ) - connection.phy = ConnectionPHY( - result.return_parameters.tx_phy, result.return_parameters.rx_phy - ) - # Emit an event to notify listeners of the new connection - self.emit('connection', connection) + # Create a connection. + connection = Connection( + self, + connection_handle, + transport, + self_address, + peer_address, + peer_resolvable_address, + role, + connection_parameters, + ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), + ) + self.connections[connection_handle] = connection - # Do so asynchronously to not block the current event handler - connection.abort_on('disconnection', read_phy()) + if ( + role == HCI_PERIPHERAL_ROLE + and self.legacy_advertiser + and self.legacy_advertiser.auto_restart + ): + connection.once( + 'disconnection', + lambda _: self.abort_on('flush', self.legacy_advertiser.start()), + ) - else: - # Emit an event to notify listeners of the new connection - self.emit('connection', connection) + if role == HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising: + # We can emit now, we have all the info we need + self._emit_le_connection(connection) @host_event_handler def on_connection_failure(self, transport, peer_address, error_code): @@ -2769,10 +3714,10 @@ class Device(CompositeEventEmitter): # For directed advertising, this means a timeout if ( transport == BT_LE_TRANSPORT - and self.advertising - and self.advertising_type.is_directed + and self.legacy_advertiser + and self.legacy_advertiser.advertising_type.is_directed ): - self.advertising = False + self.legacy_advertiser = None # Notify listeners error = core.ConnectionError( @@ -2789,8 +3734,21 @@ class Device(CompositeEventEmitter): def on_connection_request(self, bd_addr, class_of_device, link_type): logger.debug(f'*** Connection request: {bd_addr}') + # Handle SCO request. + if link_type in ( + HCI_Connection_Complete_Event.SCO_LINK_TYPE, + HCI_Connection_Complete_Event.ESCO_LINK_TYPE, + ): + if connection := self.find_connection_by_bd_addr( + bd_addr, transport=BT_BR_EDR_TRANSPORT + ): + self.emit('sco_request', connection, link_type) + else: + logger.error(f'SCO request from a non-connected device {bd_addr}') + return + # match a pending future using `bd_addr` - if bd_addr in self.classic_pending_accepts: + elif bd_addr in self.classic_pending_accepts: future, *_ = self.classic_pending_accepts.pop(bd_addr) future.set_result((bd_addr, class_of_device, link_type)) @@ -2822,30 +3780,23 @@ class Device(CompositeEventEmitter): ) @host_event_handler - @with_connection_from_handle - def on_disconnection(self, connection, reason): - logger.debug( - f'*** Disconnection: [0x{connection.handle:04X}] ' - f'{connection.peer_address} as {connection.role_name}, reason={reason}' - ) - connection.emit('disconnection', reason) - - # Remove the connection from the map - del self.connections[connection.handle] - - # Cleanup subsystems that maintain per-connection state - self.gatt_server.on_disconnection(connection) - - # Restart advertising if auto-restart is enabled - if self.auto_restart_advertising: - logger.debug('restarting advertising') - self.abort_on( - 'flush', - self.start_advertising( - advertising_type=self.advertising_type, - own_address_type=self.advertising_own_address_type, - auto_restart=True, - ), + def on_disconnection(self, connection_handle: int, reason: int) -> None: + if connection := self.connections.pop(connection_handle, None): + logger.debug( + f'*** Disconnection: [0x{connection.handle:04X}] ' + f'{connection.peer_address} as {connection.role_name}, reason={reason}' + ) + connection.emit('disconnection', reason) + + # Cleanup subsystems that maintain per-connection state + self.gatt_server.on_disconnection(connection) + elif sco_link := self.sco_links.pop(connection_handle, None): + sco_link.emit('disconnection', reason) + elif cis_link := self.cis_links.pop(connection_handle, None): + cis_link.emit('disconnection', reason) + else: + logger.error( + f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***' ) @host_event_handler @@ -2996,7 +3947,7 @@ class Device(CompositeEventEmitter): try: if await connection.abort_on('disconnection', method()): await self.host.send_command( - HCI_User_Confirmation_Request_Reply_Command( # type: ignore[call-arg] + HCI_User_Confirmation_Request_Reply_Command( bd_addr=connection.peer_address ) ) @@ -3005,7 +3956,7 @@ class Device(CompositeEventEmitter): logger.warning(f'exception while confirming: {error}') await self.host.send_command( - HCI_User_Confirmation_Request_Negative_Reply_Command( # type: ignore[call-arg] + HCI_User_Confirmation_Request_Negative_Reply_Command( bd_addr=connection.peer_address ) ) @@ -3026,7 +3977,7 @@ class Device(CompositeEventEmitter): ) if number is not None: await self.host.send_command( - HCI_User_Passkey_Request_Reply_Command( # type: ignore[call-arg] + HCI_User_Passkey_Request_Reply_Command( bd_addr=connection.peer_address, numeric_value=number ) ) @@ -3035,7 +3986,7 @@ class Device(CompositeEventEmitter): logger.warning(f'exception while asking for pass-key: {error}') await self.host.send_command( - HCI_User_Passkey_Request_Negative_Reply_Command( # type: ignore[call-arg] + HCI_User_Passkey_Request_Negative_Reply_Command( bd_addr=connection.peer_address ) ) @@ -3124,6 +4075,107 @@ class Device(CompositeEventEmitter): connection.emit('remote_name_failure', error) self.emit('remote_name_failure', address, error) + # [Classic only] + @host_event_handler + @with_connection_from_address + @experimental('Only for testing.') + def on_sco_connection( + self, acl_connection: Connection, sco_handle: int, link_type: int + ) -> None: + logger.debug( + f'*** SCO connected: {acl_connection.peer_address}, ' + f'sco_handle=[0x{sco_handle:04X}], ' + f'link_type=[0x{link_type:02X}] ***' + ) + sco_link = self.sco_links[sco_handle] = ScoLink( + device=self, + acl_connection=acl_connection, + handle=sco_handle, + link_type=link_type, + ) + self.emit('sco_connection', sco_link) + + # [Classic only] + @host_event_handler + @with_connection_from_address + @experimental('Only for testing.') + def on_sco_connection_failure( + self, acl_connection: Connection, status: int + ) -> None: + logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***') + self.emit('sco_connection_failure') + + # [Classic only] + @host_event_handler + @experimental('Only for testing') + def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None: + if sco_link := self.sco_links.get(sco_handle): + sco_link.emit('pdu', packet) + + # [LE only] + @host_event_handler + @with_connection_from_handle + @experimental('Only for testing') + def on_cis_request( + self, + acl_connection: Connection, + cis_handle: int, + cig_id: int, + cis_id: int, + ) -> None: + logger.debug( + f'*** CIS Request ' + f'acl_handle=[0x{acl_connection.handle:04X}]{acl_connection.peer_address}, ' + f'cis_handle=[0x{cis_handle:04X}], ' + f'cig_id=[0x{cig_id:02X}], ' + f'cis_id=[0x{cis_id:02X}] ***' + ) + # LE_CIS_Established event doesn't provide info, so we must store them here. + self.cis_links[cis_handle] = CisLink( + device=self, + acl_connection=acl_connection, + handle=cis_handle, + cig_id=cig_id, + cis_id=cis_id, + ) + self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id) + + # [LE only] + @host_event_handler + @experimental('Only for testing') + def on_cis_establishment(self, cis_handle: int) -> None: + cis_link = self.cis_links[cis_handle] + cis_link.state = CisLink.State.ESTABLISHED + + assert cis_link.acl_connection + + logger.debug( + f'*** CIS Establishment ' + f'{cis_link.acl_connection.peer_address}, ' + f'cis_handle=[0x{cis_handle:04X}], ' + f'cig_id=[0x{cis_link.cig_id:02X}], ' + f'cis_id=[0x{cis_link.cis_id:02X}] ***' + ) + + cis_link.emit('establishment') + self.emit('cis_establishment', cis_link) + + # [LE only] + @host_event_handler + @experimental('Only for testing') + def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None: + logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***') + if cis_link := self.cis_links.pop(cis_handle): + cis_link.emit('establishment_failure') + self.emit('cis_establishment_failure', cis_handle, status) + + # [LE only] + @host_event_handler + @experimental('Only for testing') + def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None: + if cis_link := self.cis_links.get(handle): + cis_link.emit('pdu', packet) + @host_event_handler @with_connection_from_handle def on_connection_encryption_change(self, connection, encryption): @@ -3135,10 +4187,18 @@ class Device(CompositeEventEmitter): connection.encryption = encryption if ( not connection.authenticated + and connection.transport == BT_BR_EDR_TRANSPORT and encryption == HCI_Encryption_Change_Event.AES_CCM ): connection.authenticated = True connection.sc = True + if ( + not connection.authenticated + and connection.transport == BT_LE_TRANSPORT + and encryption == HCI_Encryption_Change_Event.E0_OR_AES_CCM + ): + connection.authenticated = True + connection.sc = True connection.emit('connection_encryption_change') @host_event_handler |