aboutsummaryrefslogtreecommitdiff
path: root/bumble/device.py
diff options
context:
space:
mode:
Diffstat (limited to 'bumble/device.py')
-rw-r--r--bumble/device.py1620
1 files changed, 1340 insertions, 280 deletions
diff --git a/bumble/device.py b/bumble/device.py
index 7f11012..48f9d58 100644
--- a/bumble/device.py
+++ b/bumble/device.py
@@ -21,8 +21,10 @@ import functools
import json
import asyncio
import logging
-from contextlib import asynccontextmanager, AsyncExitStack
-from dataclasses import dataclass
+import secrets
+from contextlib import asynccontextmanager, AsyncExitStack, closing
+from dataclasses import dataclass, field
+from collections.abc import Iterable
from typing import (
Any,
Callable,
@@ -32,12 +34,15 @@ from typing import (
Optional,
Tuple,
Type,
+ TypeVar,
Union,
cast,
overload,
TYPE_CHECKING,
)
+from pyee import EventEmitter
+
from .colors import color
from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
from .gatt import Characteristic, Descriptor, Service
@@ -45,6 +50,7 @@ from .hci import (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
HCI_CENTRAL_ROLE,
+ HCI_PERIPHERAL_ROLE,
HCI_COMMAND_STATUS_PENDING,
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
@@ -56,12 +62,8 @@ from .hci import (
HCI_LE_1M_PHY,
HCI_LE_1M_PHY_BIT,
HCI_LE_2M_PHY,
- HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE,
- HCI_LE_CLEAR_RESOLVING_LIST_COMMAND,
HCI_LE_CODED_PHY,
HCI_LE_CODED_PHY_BIT,
- HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE,
- HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE,
HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND,
HCI_LE_RAND_COMMAND,
HCI_LE_READ_PHY_COMMAND,
@@ -75,37 +77,52 @@ from .hci import (
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
HCI_SUCCESS,
HCI_WRITE_LE_HOST_SUPPORT_COMMAND,
- Address,
HCI_Accept_Connection_Request_Command,
HCI_Authentication_Requested_Command,
HCI_Command_Status_Event,
HCI_Constant,
HCI_Create_Connection_Cancel_Command,
HCI_Create_Connection_Command,
+ HCI_Connection_Complete_Event,
HCI_Disconnect_Command,
HCI_Encryption_Change_Event,
HCI_Error,
HCI_IO_Capability_Request_Reply_Command,
HCI_Inquiry_Cancel_Command,
HCI_Inquiry_Command,
+ HCI_IsoDataPacket,
+ HCI_LE_Accept_CIS_Request_Command,
HCI_LE_Add_Device_To_Resolving_List_Command,
HCI_LE_Advertising_Report_Event,
HCI_LE_Clear_Resolving_List_Command,
HCI_LE_Connection_Update_Command,
HCI_LE_Create_Connection_Cancel_Command,
HCI_LE_Create_Connection_Command,
+ HCI_LE_Create_CIS_Command,
HCI_LE_Enable_Encryption_Command,
HCI_LE_Extended_Advertising_Report_Event,
HCI_LE_Extended_Create_Connection_Command,
HCI_LE_Rand_Command,
HCI_LE_Read_PHY_Command,
+ HCI_LE_Read_Remote_Features_Command,
+ HCI_LE_Reject_CIS_Request_Command,
+ HCI_LE_Remove_Advertising_Set_Command,
HCI_LE_Set_Address_Resolution_Enable_Command,
HCI_LE_Set_Advertising_Data_Command,
HCI_LE_Set_Advertising_Enable_Command,
HCI_LE_Set_Advertising_Parameters_Command,
+ HCI_LE_Set_Advertising_Set_Random_Address_Command,
+ HCI_LE_Set_CIG_Parameters_Command,
+ HCI_LE_Set_Data_Length_Command,
HCI_LE_Set_Default_PHY_Command,
HCI_LE_Set_Extended_Scan_Enable_Command,
HCI_LE_Set_Extended_Scan_Parameters_Command,
+ HCI_LE_Set_Extended_Scan_Response_Data_Command,
+ HCI_LE_Set_Extended_Advertising_Data_Command,
+ HCI_LE_Set_Extended_Advertising_Enable_Command,
+ HCI_LE_Set_Extended_Advertising_Parameters_Command,
+ HCI_LE_Set_Host_Feature_Command,
+ HCI_LE_Set_Periodic_Advertising_Enable_Command,
HCI_LE_Set_PHY_Command,
HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command,
@@ -120,6 +137,7 @@ from .hci import (
HCI_Switch_Role_Command,
HCI_Set_Connection_Encryption_Command,
HCI_StatusError,
+ HCI_SynchronousDataPacket,
HCI_User_Confirmation_Request_Negative_Reply_Command,
HCI_User_Confirmation_Request_Reply_Command,
HCI_User_Passkey_Request_Negative_Reply_Command,
@@ -132,7 +150,11 @@ from .hci import (
HCI_Write_Scan_Enable_Command,
HCI_Write_Secure_Connections_Host_Support_Command,
HCI_Write_Simple_Pairing_Mode_Command,
+ Address,
OwnAddressType,
+ LeFeature,
+ LeFeatureMask,
+ Phy,
phy_list_to_bits,
)
from .host import Host
@@ -151,9 +173,11 @@ from .core import (
from .utils import (
AsyncRunner,
CompositeEventEmitter,
+ EventWatcher,
setup_event_forwarding,
composite_listener,
deprecated,
+ experimental,
)
from .keys import (
KeyStore,
@@ -188,6 +212,8 @@ DEVICE_MIN_SCAN_WINDOW = 25
DEVICE_MAX_SCAN_WINDOW = 10240
DEVICE_MIN_LE_RSSI = -127
DEVICE_MAX_LE_RSSI = 20
+DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00
+DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF
DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00'
DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms
@@ -211,10 +237,16 @@ DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH = 0 # ms
DEVICE_DEFAULT_L2CAP_COC_MTU = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU
DEVICE_DEFAULT_L2CAP_COC_MPS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS
DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS
+DEVICE_DEFAULT_ADVERTISING_TX_POWER = (
+ HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE
+)
# fmt: on
# pylint: enable=line-too-long
+# As specified in 7.8.56 LE Set Extended Advertising Enable command
+DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION = 1.28
+
# -----------------------------------------------------------------------------
# Classes
@@ -222,16 +254,40 @@ DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONN
# -----------------------------------------------------------------------------
+@dataclass
class Advertisement:
+ # Attributes
address: Address
-
- TX_POWER_NOT_AVAILABLE = (
+ rssi: int = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
+ is_legacy: bool = False
+ is_anonymous: bool = False
+ is_connectable: bool = False
+ is_directed: bool = False
+ is_scannable: bool = False
+ is_scan_response: bool = False
+ is_complete: bool = True
+ is_truncated: bool = False
+ primary_phy: int = 0
+ secondary_phy: int = 0
+ tx_power: int = (
HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
)
- RSSI_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
+ sid: int = 0
+ data_bytes: bytes = b''
+
+ # Constants
+ TX_POWER_NOT_AVAILABLE: ClassVar[
+ int
+ ] = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
+ RSSI_NOT_AVAILABLE: ClassVar[
+ int
+ ] = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE
+
+ def __post_init__(self) -> None:
+ self.data = AdvertisingData.from_bytes(self.data_bytes)
@classmethod
- def from_advertising_report(cls, report):
+ def from_advertising_report(cls, report) -> Optional[Advertisement]:
if isinstance(report, HCI_LE_Advertising_Report_Event.Report):
return LegacyAdvertisement.from_advertising_report(report)
@@ -240,41 +296,6 @@ class Advertisement:
return None
- # pylint: disable=line-too-long
- def __init__(
- self,
- address,
- rssi=HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE,
- is_legacy=False,
- is_anonymous=False,
- is_connectable=False,
- is_directed=False,
- is_scannable=False,
- is_scan_response=False,
- is_complete=True,
- is_truncated=False,
- primary_phy=0,
- secondary_phy=0,
- tx_power=HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE,
- sid=0,
- data=b'',
- ):
- self.address = address
- self.rssi = rssi
- self.is_legacy = is_legacy
- self.is_anonymous = is_anonymous
- self.is_connectable = is_connectable
- self.is_directed = is_directed
- self.is_scannable = is_scannable
- self.is_scan_response = is_scan_response
- self.is_complete = is_complete
- self.is_truncated = is_truncated
- self.primary_phy = primary_phy
- self.secondary_phy = secondary_phy
- self.tx_power = tx_power
- self.sid = sid
- self.data = AdvertisingData.from_bytes(data)
-
# -----------------------------------------------------------------------------
class LegacyAdvertisement(Advertisement):
@@ -298,7 +319,7 @@ class LegacyAdvertisement(Advertisement):
),
is_scan_response=report.event_type
== HCI_LE_Advertising_Report_Event.SCAN_RSP,
- data=report.data,
+ data_bytes=report.data,
)
@@ -323,7 +344,7 @@ class ExtendedAdvertisement(Advertisement):
secondary_phy = report.secondary_phy,
tx_power = report.tx_power,
sid = report.advertising_sid,
- data = report.data
+ data_bytes = report.data
)
# fmt: on
@@ -384,7 +405,7 @@ class AdvertisingType(IntEnum):
# fmt: on
@property
- def has_data(self):
+ def has_data(self) -> bool:
return self in (
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
AdvertisingType.UNDIRECTED_SCANNABLE,
@@ -392,7 +413,7 @@ class AdvertisingType(IntEnum):
)
@property
- def is_connectable(self):
+ def is_connectable(self) -> bool:
return self in (
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
@@ -400,19 +421,369 @@ class AdvertisingType(IntEnum):
)
@property
- def is_scannable(self):
+ def is_scannable(self) -> bool:
return self in (
AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
AdvertisingType.UNDIRECTED_SCANNABLE,
)
@property
- def is_directed(self):
+ def is_directed(self) -> bool:
return self in (
AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY,
AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY,
)
+ @property
+ def is_high_duty_cycle_directed_connectable(self):
+ return self == AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY
+
+
+# -----------------------------------------------------------------------------
+@dataclass
+class LegacyAdvertiser:
+ device: Device
+ advertising_type: AdvertisingType
+ own_address_type: OwnAddressType
+ peer_address: Address
+ auto_restart: bool
+
+ async def start(self) -> None:
+ # Set/update the advertising data if the advertising type allows it
+ if self.advertising_type.has_data:
+ await self.device.send_command(
+ HCI_LE_Set_Advertising_Data_Command(
+ advertising_data=self.device.advertising_data
+ ),
+ check_result=True,
+ )
+
+ # Set/update the scan response data if the advertising is scannable
+ if self.advertising_type.is_scannable:
+ await self.device.send_command(
+ HCI_LE_Set_Scan_Response_Data_Command(
+ scan_response_data=self.device.scan_response_data
+ ),
+ check_result=True,
+ )
+
+ # Set the advertising parameters
+ await self.device.send_command(
+ HCI_LE_Set_Advertising_Parameters_Command(
+ advertising_interval_min=self.device.advertising_interval_min,
+ advertising_interval_max=self.device.advertising_interval_max,
+ advertising_type=int(self.advertising_type),
+ own_address_type=self.own_address_type,
+ peer_address_type=self.peer_address.address_type,
+ peer_address=self.peer_address,
+ advertising_channel_map=7,
+ advertising_filter_policy=0,
+ ),
+ check_result=True,
+ )
+
+ # Enable advertising
+ await self.device.send_command(
+ HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1),
+ check_result=True,
+ )
+
+ async def stop(self) -> None:
+ # Disable advertising
+ await self.device.send_command(
+ HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0),
+ check_result=True,
+ )
+
+
+# -----------------------------------------------------------------------------
+@dataclass
+class AdvertisingEventProperties:
+ is_connectable: bool = True
+ is_scannable: bool = False
+ is_directed: bool = False
+ is_high_duty_cycle_directed_connectable: bool = False
+ is_legacy: bool = False
+ is_anonymous: bool = False
+ include_tx_power: bool = False
+
+ def __int__(self) -> int:
+ properties = (
+ HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(0)
+ )
+ if self.is_connectable:
+ properties |= properties.CONNECTABLE_ADVERTISING
+ if self.is_scannable:
+ properties |= properties.SCANNABLE_ADVERTISING
+ if self.is_directed:
+ properties |= properties.DIRECTED_ADVERTISING
+ if self.is_high_duty_cycle_directed_connectable:
+ properties |= properties.HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING
+ if self.is_legacy:
+ properties |= properties.USE_LEGACY_ADVERTISING_PDUS
+ if self.is_anonymous:
+ properties |= properties.ANONYMOUS_ADVERTISING
+ if self.include_tx_power:
+ properties |= properties.INCLUDE_TX_POWER
+
+ return int(properties)
+
+ @classmethod
+ def from_advertising_type(
+ cls: Type[AdvertisingEventProperties],
+ advertising_type: AdvertisingType,
+ ) -> AdvertisingEventProperties:
+ return cls(
+ is_connectable=advertising_type.is_connectable,
+ is_scannable=advertising_type.is_scannable,
+ is_directed=advertising_type.is_directed,
+ is_high_duty_cycle_directed_connectable=advertising_type.is_high_duty_cycle_directed_connectable,
+ is_legacy=True,
+ is_anonymous=False,
+ include_tx_power=False,
+ )
+
+
+# -----------------------------------------------------------------------------
+# TODO: replace with typing.TypeAlias when the code base is all Python >= 3.10
+AdvertisingChannelMap = HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap
+
+
+# -----------------------------------------------------------------------------
+@dataclass
+class AdvertisingParameters:
+ # pylint: disable=line-too-long
+ advertising_event_properties: AdvertisingEventProperties = field(
+ default_factory=AdvertisingEventProperties
+ )
+ primary_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
+ primary_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
+ primary_advertising_channel_map: HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap = (
+ AdvertisingChannelMap.CHANNEL_37
+ | AdvertisingChannelMap.CHANNEL_38
+ | AdvertisingChannelMap.CHANNEL_39
+ )
+ own_address_type: OwnAddressType = OwnAddressType.RANDOM
+ peer_address: Address = Address.ANY
+ advertising_filter_policy: int = 0
+ advertising_tx_power: int = DEVICE_DEFAULT_ADVERTISING_TX_POWER
+ primary_advertising_phy: Phy = Phy.LE_1M
+ secondary_advertising_max_skip: int = 0
+ secondary_advertising_phy: Phy = Phy.LE_1M
+ advertising_sid: int = 0
+ enable_scan_request_notifications: bool = False
+ primary_advertising_phy_options: int = 0
+ secondary_advertising_phy_options: int = 0
+
+
+# -----------------------------------------------------------------------------
+@dataclass
+class PeriodicAdvertisingParameters:
+ # TODO implement this class
+ pass
+
+
+# -----------------------------------------------------------------------------
+@dataclass
+class AdvertisingSet(EventEmitter):
+ device: Device
+ advertising_handle: int
+ auto_restart: bool
+ random_address: Optional[Address]
+ advertising_parameters: AdvertisingParameters
+ advertising_data: bytes
+ scan_response_data: bytes
+ periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters]
+ periodic_advertising_data: bytes
+ selected_tx_power: int = 0
+ enabled: bool = False
+
+ def __post_init__(self) -> None:
+ super().__init__()
+
+ async def set_advertising_parameters(
+ self, advertising_parameters: AdvertisingParameters
+ ) -> None:
+ # Compliance check
+ if (
+ not advertising_parameters.advertising_event_properties.is_legacy
+ and advertising_parameters.advertising_event_properties.is_connectable
+ and advertising_parameters.advertising_event_properties.is_scannable
+ ):
+ logger.warning(
+ "non-legacy extended advertising event properties may not be both "
+ "connectable and scannable"
+ )
+
+ response = await self.device.send_command(
+ HCI_LE_Set_Extended_Advertising_Parameters_Command(
+ advertising_handle=self.advertising_handle,
+ advertising_event_properties=int(
+ advertising_parameters.advertising_event_properties
+ ),
+ primary_advertising_interval_min=(
+ int(advertising_parameters.primary_advertising_interval_min / 0.625)
+ ),
+ primary_advertising_interval_max=(
+ int(advertising_parameters.primary_advertising_interval_min / 0.625)
+ ),
+ primary_advertising_channel_map=int(
+ advertising_parameters.primary_advertising_channel_map
+ ),
+ own_address_type=advertising_parameters.own_address_type,
+ peer_address_type=advertising_parameters.peer_address.address_type,
+ peer_address=advertising_parameters.peer_address,
+ advertising_tx_power=advertising_parameters.advertising_tx_power,
+ advertising_filter_policy=(
+ advertising_parameters.advertising_filter_policy
+ ),
+ primary_advertising_phy=advertising_parameters.primary_advertising_phy,
+ secondary_advertising_max_skip=(
+ advertising_parameters.secondary_advertising_max_skip
+ ),
+ secondary_advertising_phy=(
+ advertising_parameters.secondary_advertising_phy
+ ),
+ advertising_sid=advertising_parameters.advertising_sid,
+ scan_request_notification_enable=(
+ 1 if advertising_parameters.enable_scan_request_notifications else 0
+ ),
+ ),
+ check_result=True,
+ )
+ self.selected_tx_power = response.return_parameters.selected_tx_power
+ self.advertising_parameters = advertising_parameters
+
+ async def set_advertising_data(self, advertising_data: bytes) -> None:
+ # pylint: disable=line-too-long
+ await self.device.send_command(
+ HCI_LE_Set_Extended_Advertising_Data_Command(
+ advertising_handle=self.advertising_handle,
+ operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
+ fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT,
+ advertising_data=advertising_data,
+ ),
+ check_result=True,
+ )
+ self.advertising_data = advertising_data
+
+ async def set_scan_response_data(self, scan_response_data: bytes) -> None:
+ # pylint: disable=line-too-long
+ if (
+ scan_response_data
+ and not self.advertising_parameters.advertising_event_properties.is_scannable
+ ):
+ logger.warning(
+ "ignoring attempt to set non-empty scan response data on non-scannable "
+ "advertising set"
+ )
+ return
+
+ await self.device.send_command(
+ HCI_LE_Set_Extended_Scan_Response_Data_Command(
+ advertising_handle=self.advertising_handle,
+ operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
+ fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT,
+ scan_response_data=scan_response_data,
+ ),
+ check_result=True,
+ )
+ self.scan_response_data = scan_response_data
+
+ async def set_periodic_advertising_parameters(
+ self, advertising_parameters: PeriodicAdvertisingParameters
+ ) -> None:
+ # TODO: send command
+ self.periodic_advertising_parameters = advertising_parameters
+
+ async def set_periodic_advertising_data(self, advertising_data: bytes) -> None:
+ # TODO: send command
+ self.periodic_advertising_data = advertising_data
+
+ async def set_random_address(self, random_address: Address) -> None:
+ await self.device.send_command(
+ HCI_LE_Set_Advertising_Set_Random_Address_Command(
+ advertising_handle=self.advertising_handle,
+ random_address=(random_address or self.device.random_address),
+ ),
+ check_result=True,
+ )
+
+ async def start(
+ self, duration: float = 0.0, max_advertising_events: int = 0
+ ) -> None:
+ """
+ Start advertising.
+
+ Args:
+ duration: How long to advertise for, in seconds. Use 0 (the default) for
+ an unlimited duration, unless this advertising set is a High Duty Cycle
+ Directed Advertisement type.
+ max_advertising_events: Maximum number of events to advertise for. Use 0
+ (the default) for an unlimited number of advertisements.
+ """
+ await self.device.send_command(
+ HCI_LE_Set_Extended_Advertising_Enable_Command(
+ enable=1,
+ advertising_handles=[self.advertising_handle],
+ durations=[round(duration * 100)],
+ max_extended_advertising_events=[max_advertising_events],
+ ),
+ check_result=True,
+ )
+ self.enabled = True
+
+ self.emit('start')
+
+ async def start_periodic(self, include_adi: bool = False) -> None:
+ await self.device.send_command(
+ HCI_LE_Set_Periodic_Advertising_Enable_Command(
+ enable=1 | (2 if include_adi else 0),
+ advertising_handles=self.advertising_handle,
+ ),
+ check_result=True,
+ )
+
+ self.emit('start_periodic')
+
+ async def stop(self) -> None:
+ await self.device.send_command(
+ HCI_LE_Set_Extended_Advertising_Enable_Command(
+ enable=0,
+ advertising_handles=[self.advertising_handle],
+ durations=[0],
+ max_extended_advertising_events=[0],
+ ),
+ check_result=True,
+ )
+ self.enabled = False
+
+ self.emit('stop')
+
+ async def stop_periodic(self) -> None:
+ await self.device.send_command(
+ HCI_LE_Set_Periodic_Advertising_Enable_Command(
+ enable=0,
+ advertising_handles=self.advertising_handle,
+ ),
+ check_result=True,
+ )
+
+ self.emit('stop_periodic')
+
+ async def remove(self) -> None:
+ await self.device.send_command(
+ HCI_LE_Remove_Advertising_Set_Command(
+ advertising_handle=self.advertising_handle
+ ),
+ check_result=True,
+ )
+ del self.device.extended_advertising_sets[self.advertising_handle]
+
+ def on_termination(self, status: int) -> None:
+ self.enabled = False
+ self.emit('termination', status)
+
# -----------------------------------------------------------------------------
class LePhyOptions:
@@ -429,8 +800,11 @@ class LePhyOptions:
# -----------------------------------------------------------------------------
+_PROXY_CLASS = TypeVar('_PROXY_CLASS', bound=gatt_client.ProfileServiceProxy)
+
+
class Peer:
- def __init__(self, connection):
+ def __init__(self, connection: Connection) -> None:
self.connection = connection
# Create a GATT client for the connection
@@ -438,77 +812,113 @@ class Peer:
connection.gatt_client = self.gatt_client
@property
- def services(self):
+ def services(self) -> List[gatt_client.ServiceProxy]:
return self.gatt_client.services
- async def request_mtu(self, mtu):
+ async def request_mtu(self, mtu: int) -> int:
mtu = await self.gatt_client.request_mtu(mtu)
self.connection.emit('connection_att_mtu_update')
return mtu
- async def discover_service(self, uuid):
+ async def discover_service(
+ self, uuid: Union[core.UUID, str]
+ ) -> List[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_service(uuid)
- async def discover_services(self, uuids=()):
+ async def discover_services(
+ self, uuids: Iterable[core.UUID] = ()
+ ) -> List[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_services(uuids)
- async def discover_included_services(self, service):
+ async def discover_included_services(
+ self, service: gatt_client.ServiceProxy
+ ) -> List[gatt_client.ServiceProxy]:
return await self.gatt_client.discover_included_services(service)
- async def discover_characteristics(self, uuids=(), service=None):
+ async def discover_characteristics(
+ self,
+ uuids: Iterable[Union[core.UUID, str]] = (),
+ service: Optional[gatt_client.ServiceProxy] = None,
+ ) -> List[gatt_client.CharacteristicProxy]:
return await self.gatt_client.discover_characteristics(
uuids=uuids, service=service
)
async def discover_descriptors(
- self, characteristic=None, start_handle=None, end_handle=None
+ self,
+ characteristic: Optional[gatt_client.CharacteristicProxy] = None,
+ start_handle: Optional[int] = None,
+ end_handle: Optional[int] = None,
):
return await self.gatt_client.discover_descriptors(
characteristic, start_handle, end_handle
)
- async def discover_attributes(self):
+ async def discover_attributes(self) -> List[gatt_client.AttributeProxy]:
return await self.gatt_client.discover_attributes()
- async def subscribe(self, characteristic, subscriber=None, prefer_notify=True):
+ async def subscribe(
+ self,
+ characteristic: gatt_client.CharacteristicProxy,
+ subscriber: Optional[Callable[[bytes], Any]] = None,
+ prefer_notify: bool = True,
+ ) -> None:
return await self.gatt_client.subscribe(
characteristic, subscriber, prefer_notify
)
- async def unsubscribe(self, characteristic, subscriber=None):
+ async def unsubscribe(
+ self,
+ characteristic: gatt_client.CharacteristicProxy,
+ subscriber: Optional[Callable[[bytes], Any]] = None,
+ ) -> None:
return await self.gatt_client.unsubscribe(characteristic, subscriber)
- async def read_value(self, attribute):
+ async def read_value(
+ self, attribute: Union[int, gatt_client.AttributeProxy]
+ ) -> bytes:
return await self.gatt_client.read_value(attribute)
- async def write_value(self, attribute, value, with_response=False):
+ async def write_value(
+ self,
+ attribute: Union[int, gatt_client.AttributeProxy],
+ value: bytes,
+ with_response: bool = False,
+ ) -> None:
return await self.gatt_client.write_value(attribute, value, with_response)
- async def read_characteristics_by_uuid(self, uuid, service=None):
+ async def read_characteristics_by_uuid(
+ self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
+ ) -> List[bytes]:
return await self.gatt_client.read_characteristics_by_uuid(uuid, service)
- def get_services_by_uuid(self, uuid):
+ def get_services_by_uuid(self, uuid: core.UUID) -> List[gatt_client.ServiceProxy]:
return self.gatt_client.get_services_by_uuid(uuid)
- def get_characteristics_by_uuid(self, uuid, service=None):
+ def get_characteristics_by_uuid(
+ self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None
+ ) -> List[gatt_client.CharacteristicProxy]:
return self.gatt_client.get_characteristics_by_uuid(uuid, service)
- def create_service_proxy(self, proxy_class):
- return proxy_class.from_client(self.gatt_client)
+ def create_service_proxy(self, proxy_class: Type[_PROXY_CLASS]) -> _PROXY_CLASS:
+ return cast(_PROXY_CLASS, proxy_class.from_client(self.gatt_client))
- async def discover_service_and_create_proxy(self, proxy_class):
+ async def discover_service_and_create_proxy(
+ self, proxy_class: Type[_PROXY_CLASS]
+ ) -> Optional[_PROXY_CLASS]:
# Discover the first matching service and its characteristics
services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
if services:
service = services[0]
await service.discover_characteristics()
return self.create_service_proxy(proxy_class)
+ return None
- async def sustain(self, timeout=None):
+ async def sustain(self, timeout: Optional[float] = None) -> None:
await self.connection.sustain(timeout)
# [Classic only]
- async def request_name(self):
+ async def request_name(self) -> str:
return await self.connection.request_remote_name()
async def __aenter__(self):
@@ -521,7 +931,7 @@ class Peer:
async def __aexit__(self, exc_type, exc_value, traceback):
pass
- def __str__(self):
+ def __str__(self) -> str:
return f'{self.connection.peer_address} as {self.connection.role_name}'
@@ -541,6 +951,46 @@ ConnectionParametersPreferences.default = ConnectionParametersPreferences()
# -----------------------------------------------------------------------------
+@dataclass
+class ScoLink(CompositeEventEmitter):
+ device: Device
+ acl_connection: Connection
+ handle: int
+ link_type: int
+
+ def __post_init__(self):
+ super().__init__()
+
+ async def disconnect(
+ self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
+ ) -> None:
+ await self.device.disconnect(self, reason)
+
+
+# -----------------------------------------------------------------------------
+@dataclass
+class CisLink(CompositeEventEmitter):
+ class State(IntEnum):
+ PENDING = 0
+ ESTABLISHED = 1
+
+ device: Device
+ acl_connection: Connection # Based ACL connection
+ handle: int # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events)
+ cis_id: int # CIS ID assigned by Central device
+ cig_id: int # CIG ID assigned by Central device
+ state: State = State.PENDING
+
+ def __post_init__(self):
+ super().__init__()
+
+ async def disconnect(
+ self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
+ ) -> None:
+ await self.device.disconnect(self, reason)
+
+
+# -----------------------------------------------------------------------------
class Connection(CompositeEventEmitter):
device: Device
handle: int
@@ -548,6 +998,7 @@ class Connection(CompositeEventEmitter):
self_address: Address
peer_address: Address
peer_resolvable_address: Optional[Address]
+ peer_le_features: Optional[LeFeatureMask]
role: int
encryption: int
authenticated: bool
@@ -621,6 +1072,7 @@ class Connection(CompositeEventEmitter):
) # By default, use the device's shared server
self.pairing_peer_io_capability = None
self.pairing_peer_authentication_requirements = None
+ self.peer_le_features = None
# [Classic only]
@classmethod
@@ -721,7 +1173,7 @@ class Connection(CompositeEventEmitter):
async def switch_role(self, role: int) -> None:
return await self.device.switch_role(self, role)
- async def sustain(self, timeout=None):
+ async def sustain(self, timeout: Optional[float] = None) -> None:
"""Idles the current task waiting for a disconnect or timeout"""
abort = asyncio.get_running_loop().create_future()
@@ -736,6 +1188,9 @@ class Connection(CompositeEventEmitter):
self.remove_listener('disconnection', abort.set_result)
self.remove_listener('disconnection_failure', abort.set_exception)
+ async def set_data_length(self, tx_octets, tx_time) -> None:
+ return await self.device.set_data_length(self, tx_octets, tx_time)
+
async def update_parameters(
self,
connection_interval_min,
@@ -766,6 +1221,15 @@ class Connection(CompositeEventEmitter):
async def request_remote_name(self):
return await self.device.request_remote_name(self)
+ async def get_remote_le_features(self) -> LeFeatureMask:
+ """[LE Only] Reads remote LE supported features.
+
+ Returns:
+ LE features supported by the remote device.
+ """
+ self.peer_le_features = await self.device.get_remote_le_features(self)
+ return self.peer_le_features
+
async def __aenter__(self):
return self
@@ -782,7 +1246,8 @@ class Connection(CompositeEventEmitter):
return (
f'Connection(handle=0x{self.handle:04X}, '
f'role={self.role_name}, '
- f'address={self.peer_address})'
+ f'self_address={self.self_address}, '
+ f'peer_address={self.peer_address})'
)
@@ -815,6 +1280,7 @@ class DeviceConfiguration:
self.keystore = None
self.gatt_services: List[Dict[str, Any]] = []
self.address_resolution_offload = False
+ self.cis_enabled = False
def load_from_dict(self, config: Dict[str, Any]) -> None:
# Load simple properties
@@ -850,17 +1316,21 @@ class DeviceConfiguration:
self.address_resolution_offload = config.get(
'address_resolution_offload', self.address_resolution_offload
)
+ self.cis_enabled = config.get('cis_enabled', self.cis_enabled)
# Load or synthesize an IRK
irk = config.get('irk')
if irk:
self.irk = bytes.fromhex(irk)
- else:
+ elif self.address != Address(DEVICE_DEFAULT_ADDRESS):
# Construct an IRK from the address bytes
# NOTE: this is not secure, but will always give the same IRK for the same
# address
address_bytes = bytes(self.address)
self.irk = (address_bytes * 3)[:16]
+ else:
+ # Fallback - when both IRK and address are not set, randomly generate an IRK.
+ self.irk = secrets.token_bytes(16)
# Load advertising data
advertising_data = config.get('advertising_data')
@@ -890,7 +1360,7 @@ def with_connection_from_handle(function):
@functools.wraps(function)
def wrapper(self, connection_handle, *args, **kwargs):
if (connection := self.lookup_connection(connection_handle)) is None:
- raise ValueError(f"no connection for handle: 0x{connection_handle:04x}")
+ raise ValueError(f'no connection for handle: 0x{connection_handle:04x}')
return function(self, connection, *args, **kwargs)
return wrapper
@@ -956,6 +1426,10 @@ class Device(CompositeEventEmitter):
]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration
+ legacy_advertiser: Optional[LegacyAdvertiser]
+ sco_links: Dict[int, ScoLink]
+ cis_links: Dict[int, CisLink]
+ _pending_cis: Dict[int, Tuple[int, int]]
@composite_listener
class Listener:
@@ -1030,10 +1504,7 @@ class Device(CompositeEventEmitter):
self._host = None
self.powered_on = False
- self.advertising = False
- self.advertising_type = None
self.auto_restart_inquiry = True
- self.auto_restart_advertising = False
self.command_timeout = 10 # seconds
self.gatt_server = gatt_server.Server(self)
self.sdp_server = sdp.Server(self)
@@ -1048,6 +1519,9 @@ class Device(CompositeEventEmitter):
self.disconnecting = False
self.connections = {} # Connections, by connection handle
self.pending_connections = {} # Connections, by BD address (BR/EDR only)
+ self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only)
+ self.cis_links = {} # CisLinks, by connection handle (LE only)
+ self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle
self.classic_enabled = False
self.inquiry_response = None
self.address_resolver = None
@@ -1056,7 +1530,6 @@ class Device(CompositeEventEmitter):
} # Futures, by BD address OR [Futures] for Address.ANY
# Own address type cache
- self.advertising_own_address_type = None
self.connect_own_address_type = None
# Use the initial config or a default
@@ -1067,15 +1540,12 @@ class Device(CompositeEventEmitter):
self.name = config.name
self.random_address = config.address
self.class_of_device = config.class_of_device
- self.scan_response_data = config.scan_response_data
- self.advertising_data = config.advertising_data
- self.advertising_interval_min = config.advertising_interval_min
- self.advertising_interval_max = config.advertising_interval_max
self.keystore = None
self.irk = config.irk
self.le_enabled = config.le_enabled
self.classic_enabled = config.classic_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled
+ self.cis_enabled = config.cis_enabled
self.classic_sc_enabled = config.classic_sc_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_smp_enabled = config.classic_smp_enabled
@@ -1084,6 +1554,22 @@ class Device(CompositeEventEmitter):
self.classic_accept_any = config.classic_accept_any
self.address_resolution_offload = config.address_resolution_offload
+ # Extended advertising.
+ self.extended_advertising_sets: Dict[int, AdvertisingSet] = {}
+
+ # Legacy advertising.
+ # The advertising and scan response data, as well as the advertising interval
+ # values are stored as properties of this object for convenience so that they
+ # can be initialized from a config object, and for backward compatibility for
+ # client code that may set those values directly before calling
+ # start_advertising().
+ self.legacy_advertising_set: Optional[AdvertisingSet] = None
+ self.legacy_advertiser: Optional[LegacyAdvertiser] = None
+ self.advertising_data = config.advertising_data
+ self.scan_response_data = config.scan_response_data
+ self.advertising_interval_min = config.advertising_interval_min
+ self.advertising_interval_max = config.advertising_interval_max
+
for service in config.gatt_services:
characteristics = []
for characteristic in service.get("characteristics", []):
@@ -1092,7 +1578,8 @@ class Device(CompositeEventEmitter):
# Leave this check until 5/25/2023
if descriptor.get("permission", False):
raise Exception(
- "Error parsing Device Config's GATT Services. The key 'permission' must be renamed to 'permissions'"
+ "Error parsing Device Config's GATT Services. "
+ "The key 'permission' must be renamed to 'permissions'"
)
new_descriptor = Descriptor(
attribute_type=descriptor["descriptor_type"],
@@ -1308,7 +1795,7 @@ class Device(CompositeEventEmitter):
self.host.send_command(command, check_result), self.command_timeout
)
except asyncio.TimeoutError as error:
- logger.warning('!!! Command timed out')
+ logger.warning(f'!!! Command {command.name} timed out')
raise CommandTimeoutError() from error
async def power_on(self) -> None:
@@ -1316,7 +1803,7 @@ class Device(CompositeEventEmitter):
await self.host.reset()
# Try to get the public address from the controller
- response = await self.send_command(HCI_Read_BD_ADDR_Command()) # type: ignore[call-arg]
+ response = await self.send_command(HCI_Read_BD_ADDR_Command())
if response.return_parameters.status == HCI_SUCCESS:
logger.debug(
color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow')
@@ -1339,7 +1826,7 @@ class Device(CompositeEventEmitter):
HCI_Write_LE_Host_Support_Command(
le_supported_host=int(self.le_enabled),
simultaneous_le_host=int(self.le_simultaneous_enabled),
- ) # type: ignore[call-arg]
+ )
)
if self.le_enabled:
@@ -1349,7 +1836,7 @@ class Device(CompositeEventEmitter):
if self.host.supports_command(HCI_LE_RAND_COMMAND):
# Get 8 random bytes
response = await self.send_command(
- HCI_LE_Rand_Command(), check_result=True # type: ignore[call-arg]
+ HCI_LE_Rand_Command(), check_result=True
)
# Ensure the address bytes can be a static random address
@@ -1370,7 +1857,7 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Random_Address_Command(
random_address=self.random_address
- ), # type: ignore[call-arg]
+ ),
check_result=True,
)
@@ -1383,25 +1870,33 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Address_Resolution_Enable_Command(
address_resolution_enable=1
- ) # type: ignore[call-arg]
+ )
+ )
+
+ if self.cis_enabled:
+ await self.send_command(
+ HCI_LE_Set_Host_Feature_Command(
+ bit_number=LeFeature.CONNECTED_ISOCHRONOUS_STREAM,
+ bit_value=1,
+ )
)
if self.classic_enabled:
await self.send_command(
- HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg]
+ HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))
)
await self.send_command(
- HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device) # type: ignore[call-arg]
+ HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device)
)
await self.send_command(
HCI_Write_Simple_Pairing_Mode_Command(
simple_pairing_mode=int(self.classic_ssp_enabled)
- ) # type: ignore[call-arg]
+ )
)
await self.send_command(
HCI_Write_Secure_Connections_Host_Support_Command(
secure_connections_host_support=int(self.classic_sc_enabled)
- ) # type: ignore[call-arg]
+ )
)
await self.set_connectable(self.connectable)
await self.set_discoverable(self.discoverable)
@@ -1409,6 +1904,9 @@ class Device(CompositeEventEmitter):
# Done
self.powered_on = True
+ async def reset(self) -> None:
+ await self.host.reset()
+
async def power_off(self) -> None:
if self.powered_on:
await self.host.flush()
@@ -1422,7 +1920,17 @@ class Device(CompositeEventEmitter):
self.address_resolver = smp.AddressResolver(resolving_keys)
if self.address_resolution_offload:
- await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg]
+ await self.send_command(HCI_LE_Clear_Resolving_List_Command())
+
+ # Add an empty entry for non-directed address generation.
+ await self.send_command(
+ HCI_LE_Add_Device_To_Resolving_List_Command(
+ peer_identity_address_type=Address.ANY.address_type,
+ peer_identity_address=Address.ANY,
+ peer_irk=bytes(16),
+ local_irk=self.irk,
+ )
+ )
for irk, address in resolving_keys:
await self.send_command(
@@ -1431,24 +1939,28 @@ class Device(CompositeEventEmitter):
peer_identity_address=address,
peer_irk=irk,
local_irk=self.irk,
- ) # type: ignore[call-arg]
+ )
)
- def supports_le_feature(self, feature):
- return self.host.supports_le_feature(feature)
+ def supports_le_features(self, feature: LeFeatureMask) -> bool:
+ return self.host.supports_le_features(feature)
def supports_le_phy(self, phy):
if phy == HCI_LE_1M_PHY:
return True
feature_map = {
- HCI_LE_2M_PHY: HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE,
- HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE,
+ HCI_LE_2M_PHY: LeFeatureMask.LE_2M_PHY,
+ HCI_LE_CODED_PHY: LeFeatureMask.LE_CODED_PHY,
}
if phy not in feature_map:
raise ValueError('invalid PHY')
- return self.host.supports_le_feature(feature_map[phy])
+ return self.supports_le_features(feature_map[phy])
+
+ @property
+ def supports_le_extended_advertising(self):
+ return self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING)
async def start_advertising(
self,
@@ -1456,82 +1968,261 @@ class Device(CompositeEventEmitter):
target: Optional[Address] = None,
own_address_type: int = OwnAddressType.RANDOM,
auto_restart: bool = False,
+ advertising_data: Optional[bytes] = None,
+ scan_response_data: Optional[bytes] = None,
+ advertising_interval_min: Optional[int] = None,
+ advertising_interval_max: Optional[int] = None,
) -> None:
- # If we're advertising, stop first
- if self.advertising:
- await self.stop_advertising()
-
- # Set/update the advertising data if the advertising type allows it
- if advertising_type.has_data:
- await self.send_command(
- HCI_LE_Set_Advertising_Data_Command(
- advertising_data=self.advertising_data
- ), # type: ignore[call-arg]
- check_result=True,
- )
-
- # Set/update the scan response data if the advertising is scannable
- if advertising_type.is_scannable:
- await self.send_command(
- HCI_LE_Set_Scan_Response_Data_Command(
- scan_response_data=self.scan_response_data
- ), # type: ignore[call-arg]
- check_result=True,
- )
+ """Start legacy advertising.
+
+ If the controller supports it, extended advertising commands with legacy PDUs
+ will be used to advertise. If not, legacy advertising commands will be used.
+
+ Args:
+ advertising_type:
+ Type of advertising events.
+ target:
+ Peer address for directed advertising target.
+ (Ignored if `advertising_type` is not directed)
+ own_address_type:
+ Own address type to use in the advertising.
+ auto_restart:
+ Whether the advertisement will be restarted after disconnection.
+ advertising_data:
+ Raw advertising data. If None, the value of the property
+ self.advertising_data will be used.
+ scan_response_data:
+ Raw scan response. If None, the value of the property
+ self.scan_response_data will be used.
+ advertising_interval_min:
+ Minimum advertising interval, in milliseconds. If None, the value of the
+ property self.advertising_interval_min will be used.
+ advertising_interval_max:
+ Maximum advertising interval, in milliseconds. If None, the value of the
+ property self.advertising_interval_max will be used.
+ """
+ # Update backing properties.
+ if advertising_data is not None:
+ self.advertising_data = advertising_data
+ if scan_response_data is not None:
+ self.scan_response_data = scan_response_data
+ if advertising_interval_min is not None:
+ self.advertising_interval_min = advertising_interval_min
+ if advertising_interval_max is not None:
+ self.advertising_interval_max = advertising_interval_max
# Decide what peer address to use
if advertising_type.is_directed:
if target is None:
- raise ValueError('directed advertising requires a target address')
-
+ raise ValueError('directed advertising requires a target')
peer_address = target
- peer_address_type = target.address_type
else:
- peer_address = Address('00:00:00:00:00:00')
- peer_address_type = Address.PUBLIC_DEVICE_ADDRESS
-
- # Set the advertising parameters
- await self.send_command(
- HCI_LE_Set_Advertising_Parameters_Command(
- advertising_interval_min=self.advertising_interval_min,
- advertising_interval_max=self.advertising_interval_max,
- advertising_type=int(advertising_type),
- own_address_type=own_address_type,
- peer_address_type=peer_address_type,
+ peer_address = Address.ANY
+
+ # If we're already advertising, stop now because we'll be re-creating
+ # a new advertiser or advertising set.
+ await self.stop_advertising()
+ assert self.legacy_advertiser is None
+ assert self.legacy_advertising_set is None
+
+ if self.supports_le_extended_advertising:
+ # Use extended advertising commands with legacy PDUs.
+ self.legacy_advertising_set = await self.create_advertising_set(
+ auto_start=True,
+ auto_restart=auto_restart,
+ random_address=self.random_address,
+ advertising_parameters=AdvertisingParameters(
+ advertising_event_properties=(
+ AdvertisingEventProperties.from_advertising_type(
+ advertising_type
+ )
+ ),
+ primary_advertising_interval_min=self.advertising_interval_min,
+ primary_advertising_interval_max=self.advertising_interval_max,
+ own_address_type=OwnAddressType(own_address_type),
+ peer_address=peer_address,
+ ),
+ advertising_data=(
+ self.advertising_data if advertising_type.has_data else b''
+ ),
+ scan_response_data=(
+ self.scan_response_data if advertising_type.is_scannable else b''
+ ),
+ )
+ else:
+ # Use legacy commands.
+ self.legacy_advertiser = LegacyAdvertiser(
+ device=self,
+ advertising_type=advertising_type,
+ own_address_type=OwnAddressType(own_address_type),
peer_address=peer_address,
- advertising_channel_map=7,
- advertising_filter_policy=0,
- ), # type: ignore[call-arg]
- check_result=True,
- )
-
- # Enable advertising
- await self.send_command(
- HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), # type: ignore[call-arg]
- check_result=True,
- )
+ auto_restart=auto_restart,
+ )
- self.advertising_type = advertising_type
- self.advertising_own_address_type = own_address_type
- self.advertising = True
- self.auto_restart_advertising = auto_restart
+ await self.legacy_advertiser.start()
async def stop_advertising(self) -> None:
+ """Stop legacy advertising."""
# Disable advertising
- if self.advertising:
+ if self.legacy_advertising_set:
+ if self.legacy_advertising_set.enabled:
+ await self.legacy_advertising_set.stop()
+ await self.legacy_advertising_set.remove()
+ self.legacy_advertising_set = None
+ elif self.legacy_advertiser:
+ await self.legacy_advertiser.stop()
+ self.legacy_advertiser = None
+
+ async def create_advertising_set(
+ self,
+ advertising_parameters: Optional[AdvertisingParameters] = None,
+ random_address: Optional[Address] = None,
+ advertising_data: bytes = b'',
+ scan_response_data: bytes = b'',
+ periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] = None,
+ periodic_advertising_data: bytes = b'',
+ auto_start: bool = True,
+ auto_restart: bool = False,
+ ) -> AdvertisingSet:
+ """
+ Create an advertising set.
+
+ This method allows the creation of advertising sets for controllers that
+ support extended advertising.
+
+ Args:
+ advertising_parameters:
+ The parameters to use for this set. If None, default parameters are used.
+ random_address:
+ The random address to use (only relevant when the parameters specify that
+ own_address_type is random).
+ advertising_data:
+ Initial value for the set's advertising data.
+ scan_response_data:
+ Initial value for the set's scan response data.
+ periodic_advertising_parameters:
+ The parameters to use for periodic advertising (if needed).
+ periodic_advertising_data:
+ Initial value for the set's periodic advertising data.
+ auto_start:
+ True if the set should be automatically started upon creation.
+ auto_restart:
+ True if the set should be automatically restated after a disconnection.
+
+ Returns:
+ An AdvertisingSet instance.
+ """
+ # Instantiate default values
+ if advertising_parameters is None:
+ advertising_parameters = AdvertisingParameters()
+
+ if (
+ not advertising_parameters.advertising_event_properties.is_legacy
+ and advertising_data
+ and scan_response_data
+ ):
+ raise ValueError(
+ "Extended advertisements can't have both data and scan \
+ response data"
+ )
+
+ # Allocate a new handle
+ try:
+ advertising_handle = next(
+ handle
+ for handle in range(
+ DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE,
+ DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1,
+ )
+ if handle not in self.extended_advertising_sets
+ )
+ except StopIteration as exc:
+ raise RuntimeError("all valid advertising handles already in use") from exc
+
+ # Use the device's random address if a random address is needed but none was
+ # provided.
+ if (
+ advertising_parameters.own_address_type
+ in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM)
+ and random_address is None
+ ):
+ random_address = self.random_address
+
+ # Create the object that represents the set.
+ advertising_set = AdvertisingSet(
+ device=self,
+ advertising_handle=advertising_handle,
+ auto_restart=auto_restart,
+ random_address=random_address,
+ advertising_parameters=advertising_parameters,
+ advertising_data=advertising_data,
+ scan_response_data=scan_response_data,
+ periodic_advertising_parameters=periodic_advertising_parameters,
+ periodic_advertising_data=periodic_advertising_data,
+ )
+
+ # Create the set in the controller.
+ await advertising_set.set_advertising_parameters(advertising_parameters)
+
+ # Update the set in the controller.
+ try:
+ if random_address:
+ await advertising_set.set_random_address(random_address)
+
+ if advertising_data:
+ await advertising_set.set_advertising_data(advertising_data)
+
+ if scan_response_data:
+ await advertising_set.set_scan_response_data(scan_response_data)
+
+ if periodic_advertising_parameters:
+ # TODO: call LE Set Periodic Advertising Parameters command
+ raise NotImplementedError('periodic advertising not yet supported')
+
+ if periodic_advertising_data:
+ # TODO: call LE Set Periodic Advertising Data command
+ raise NotImplementedError('periodic advertising not yet supported')
+
+ except HCI_Error as error:
+ # Remove the advertising set so that it doesn't stay dangling in the
+ # controller.
await self.send_command(
- HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), # type: ignore[call-arg]
- check_result=True,
+ HCI_LE_Remove_Advertising_Set_Command(
+ advertising_handle=advertising_data
+ ),
+ check_result=False,
)
+ raise error
+
+ # Remember the set.
+ self.extended_advertising_sets[advertising_handle] = advertising_set
- self.advertising_type = None
- self.advertising_own_address_type = None
- self.advertising = False
- self.auto_restart_advertising = False
+ # Try to start the set if requested.
+ if auto_start:
+ try:
+ # pylint: disable=line-too-long
+ duration = (
+ DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION
+ if advertising_parameters.advertising_event_properties.is_high_duty_cycle_directed_connectable
+ else 0
+ )
+ await advertising_set.start(duration=duration)
+ except Exception as error:
+ logger.exception(f'failed to start advertising set: {error}')
+ await advertising_set.remove()
+ raise
+
+ return advertising_set
@property
def is_advertising(self):
- return self.advertising
+ if self.legacy_advertiser:
+ return True
+
+ return any(
+ advertising_set.enabled
+ for advertising_set in self.extended_advertising_sets.values()
+ )
async def start_scanning(
self,
@@ -1541,7 +2232,7 @@ class Device(CompositeEventEmitter):
scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
own_address_type: int = OwnAddressType.RANDOM,
filter_duplicates: bool = False,
- scanning_phys: Tuple[int, int] = (HCI_LE_1M_PHY, HCI_LE_CODED_PHY),
+ scanning_phys: List[int] = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY],
) -> None:
# Check that the arguments are legal
if scan_interval < scan_window:
@@ -1558,9 +2249,7 @@ class Device(CompositeEventEmitter):
self.advertisement_accumulators = {}
# Enable scanning
- if not legacy and self.supports_le_feature(
- HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE
- ):
+ if not legacy and self.supports_le_extended_advertising:
# Set the scanning parameters
scan_type = (
HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING
@@ -1577,7 +2266,7 @@ class Device(CompositeEventEmitter):
scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT
scanning_phy_count += 1
if HCI_LE_CODED_PHY in scanning_phys:
- if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE):
+ if self.supports_le_features(LeFeatureMask.LE_CODED_PHY):
scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT
scanning_phy_count += 1
@@ -1592,7 +2281,7 @@ class Device(CompositeEventEmitter):
scan_types=[scan_type] * scanning_phy_count,
scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count,
scan_windows=[int(scan_window / 0.625)] * scanning_phy_count,
- ), # type: ignore[call-arg]
+ ),
check_result=True,
)
@@ -1603,7 +2292,7 @@ class Device(CompositeEventEmitter):
filter_duplicates=1 if filter_duplicates else 0,
duration=0, # TODO allow other values
period=0, # TODO allow other values
- ), # type: ignore[call-arg]
+ ),
check_result=True,
)
else:
@@ -1621,7 +2310,7 @@ class Device(CompositeEventEmitter):
le_scan_window=int(scan_window / 0.625),
own_address_type=own_address_type,
scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY,
- ), # type: ignore[call-arg]
+ ),
check_result=True,
)
@@ -1629,25 +2318,25 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Scan_Enable_Command(
le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0
- ), # type: ignore[call-arg]
+ ),
check_result=True,
)
self.scanning_is_passive = not active
self.scanning = True
- async def stop_scanning(self) -> None:
+ async def stop_scanning(self, legacy: bool = False) -> None:
# Disable scanning
- if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE):
+ if not legacy and self.supports_le_extended_advertising:
await self.send_command(
HCI_LE_Set_Extended_Scan_Enable_Command(
enable=0, filter_duplicates=0, duration=0, period=0
- ), # type: ignore[call-arg]
+ ),
check_result=True,
)
else:
await self.send_command(
- HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0), # type: ignore[call-arg]
+ HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0),
check_result=True,
)
@@ -1667,7 +2356,7 @@ class Device(CompositeEventEmitter):
async def start_discovery(self, auto_restart: bool = True) -> None:
await self.send_command(
- HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE), # type: ignore[call-arg]
+ HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE),
check_result=True,
)
@@ -1676,7 +2365,7 @@ class Device(CompositeEventEmitter):
lap=HCI_GENERAL_INQUIRY_LAP,
inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH,
num_responses=0, # Unlimited number of responses.
- ) # type: ignore[call-arg]
+ )
)
if response.status != HCI_Command_Status_Event.PENDING:
self.discovering = False
@@ -1687,7 +2376,7 @@ class Device(CompositeEventEmitter):
async def stop_discovery(self) -> None:
if self.discovering:
- await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True) # type: ignore[call-arg]
+ await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True)
self.auto_restart_inquiry = True
self.discovering = False
@@ -1735,7 +2424,7 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_Write_Extended_Inquiry_Response_Command(
fec_required=0, extended_inquiry_response=self.inquiry_response
- ), # type: ignore[call-arg]
+ ),
check_result=True,
)
await self.set_scan_enable(
@@ -1924,7 +2613,7 @@ class Device(CompositeEventEmitter):
supervision_timeouts=supervision_timeouts,
min_ce_lengths=min_ce_lengths,
max_ce_lengths=max_ce_lengths,
- ) # type: ignore[call-arg]
+ )
)
else:
if HCI_LE_1M_PHY not in connection_parameters_preferences:
@@ -1953,7 +2642,7 @@ class Device(CompositeEventEmitter):
supervision_timeout=int(prefs.supervision_timeout / 10),
min_ce_length=int(prefs.min_ce_length / 0.625),
max_ce_length=int(prefs.max_ce_length / 0.625),
- ) # type: ignore[call-arg]
+ )
)
else:
# Save pending connection
@@ -1970,7 +2659,7 @@ class Device(CompositeEventEmitter):
clock_offset=0x0000,
allow_role_switch=0x01,
reserved=0,
- ) # type: ignore[call-arg]
+ )
)
if result.status != HCI_Command_Status_Event.PENDING:
@@ -1989,10 +2678,10 @@ class Device(CompositeEventEmitter):
)
except asyncio.TimeoutError:
if transport == BT_LE_TRANSPORT:
- await self.send_command(HCI_LE_Create_Connection_Cancel_Command()) # type: ignore[call-arg]
+ await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
else:
await self.send_command(
- HCI_Create_Connection_Cancel_Command(bd_addr=peer_address) # type: ignore[call-arg]
+ HCI_Create_Connection_Cancel_Command(bd_addr=peer_address)
)
try:
@@ -2106,7 +2795,7 @@ class Device(CompositeEventEmitter):
try:
# Accept connection request
await self.send_command(
- HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role) # type: ignore[call-arg]
+ HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role)
)
# Wait for connection complete
@@ -2163,7 +2852,9 @@ class Device(CompositeEventEmitter):
check_result=True,
)
- async def disconnect(self, connection, reason):
+ async def disconnect(
+ self, connection: Union[Connection, ScoLink, CisLink], reason: int
+ ) -> None:
# Create a future so that we can wait for the disconnection's result
pending_disconnection = asyncio.get_running_loop().create_future()
connection.on('disconnection', pending_disconnection.set_result)
@@ -2190,6 +2881,22 @@ class Device(CompositeEventEmitter):
)
self.disconnecting = False
+ async def set_data_length(self, connection, tx_octets, tx_time) -> None:
+ if tx_octets < 0x001B or tx_octets > 0x00FB:
+ raise ValueError('tx_octets must be between 0x001B and 0x00FB')
+
+ if tx_time < 0x0148 or tx_time > 0x4290:
+ raise ValueError('tx_time must be between 0x0148 and 0x4290')
+
+ return await self.send_command(
+ HCI_LE_Set_Data_Length_Command(
+ connection_handle=connection.handle,
+ tx_octets=tx_octets,
+ tx_time=tx_time,
+ ),
+ check_result=True,
+ )
+
async def update_connection_parameters(
self,
connection,
@@ -2232,7 +2939,7 @@ class Device(CompositeEventEmitter):
supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
- ) # type: ignore[call-arg]
+ )
)
if result.status != HCI_Command_Status_Event.PENDING:
raise HCI_StatusError(result)
@@ -2560,7 +3267,7 @@ class Device(CompositeEventEmitter):
try:
result = await self.send_command(
- HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role) # type: ignore[call-arg]
+ HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role)
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
@@ -2602,7 +3309,7 @@ class Device(CompositeEventEmitter):
page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2,
reserved=0,
clock_offset=0, # TODO investigate non-0 values
- ) # type: ignore[call-arg]
+ )
)
if result.status != HCI_COMMAND_STATUS_PENDING:
@@ -2618,6 +3325,172 @@ class Device(CompositeEventEmitter):
self.remove_listener('remote_name', handler)
self.remove_listener('remote_name_failure', failure_handler)
+ # [LE only]
+ @experimental('Only for testing.')
+ async def setup_cig(
+ self,
+ cig_id: int,
+ cis_id: List[int],
+ sdu_interval: Tuple[int, int],
+ framing: int,
+ max_sdu: Tuple[int, int],
+ retransmission_number: int,
+ max_transport_latency: Tuple[int, int],
+ ) -> List[int]:
+ """Sends HCI_LE_Set_CIG_Parameters_Command.
+
+ Args:
+ cig_id: CIG_ID.
+ cis_id: CID ID list.
+ sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental).
+ framing: Un-framing(0) or Framing(1).
+ max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental).
+ retransmission_number: retransmission_number.
+ max_transport_latency: Max transport latencies of
+ (Central->Peripheral, Peripheral->Cental).
+
+ Returns:
+ List of created CIS handles corresponding to the same order of [cid_id].
+ """
+ num_cis = len(cis_id)
+
+ response = await self.send_command(
+ HCI_LE_Set_CIG_Parameters_Command(
+ cig_id=cig_id,
+ sdu_interval_c_to_p=sdu_interval[0],
+ sdu_interval_p_to_c=sdu_interval[1],
+ worst_case_sca=0x00, # 251-500 ppm
+ packing=0x00, # Sequential
+ framing=framing,
+ max_transport_latency_c_to_p=max_transport_latency[0],
+ max_transport_latency_p_to_c=max_transport_latency[1],
+ cis_id=cis_id,
+ max_sdu_c_to_p=[max_sdu[0]] * num_cis,
+ max_sdu_p_to_c=[max_sdu[1]] * num_cis,
+ phy_c_to_p=[HCI_LE_2M_PHY] * num_cis,
+ phy_p_to_c=[HCI_LE_2M_PHY] * num_cis,
+ rtn_c_to_p=[retransmission_number] * num_cis,
+ rtn_p_to_c=[retransmission_number] * num_cis,
+ ),
+ check_result=True,
+ )
+
+ # Ideally, we should manage CIG lifecycle, but they are not useful for Unicast
+ # Server, so here it only provides a basic functionality for testing.
+ cis_handles = response.return_parameters.connection_handle[:]
+ for id, cis_handle in zip(cis_id, cis_handles):
+ self._pending_cis[cis_handle] = (id, cig_id)
+
+ return cis_handles
+
+ # [LE only]
+ @experimental('Only for testing.')
+ async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> List[CisLink]:
+ for cis_handle, acl_handle in cis_acl_pairs:
+ acl_connection = self.lookup_connection(acl_handle)
+ assert acl_connection
+ cis_id, cig_id = self._pending_cis.pop(cis_handle)
+ self.cis_links[cis_handle] = CisLink(
+ device=self,
+ acl_connection=acl_connection,
+ handle=cis_handle,
+ cis_id=cis_id,
+ cig_id=cig_id,
+ )
+
+ with closing(EventWatcher()) as watcher:
+ pending_cis_establishments = {
+ cis_handle: asyncio.get_running_loop().create_future()
+ for cis_handle, _ in cis_acl_pairs
+ }
+
+ @watcher.on(self, 'cis_establishment')
+ def on_cis_establishment(cis_link: CisLink) -> None:
+ if pending_future := pending_cis_establishments.get(cis_link.handle):
+ pending_future.set_result(cis_link)
+
+ result = await self.send_command(
+ HCI_LE_Create_CIS_Command(
+ cis_connection_handle=[p[0] for p in cis_acl_pairs],
+ acl_connection_handle=[p[1] for p in cis_acl_pairs],
+ ),
+ )
+ if result.status != HCI_COMMAND_STATUS_PENDING:
+ logger.warning(
+ 'HCI_LE_Create_CIS_Command failed: '
+ f'{HCI_Constant.error_name(result.status)}'
+ )
+ raise HCI_StatusError(result)
+
+ return await asyncio.gather(*pending_cis_establishments.values())
+
+ # [LE only]
+ @experimental('Only for testing.')
+ async def accept_cis_request(self, handle: int) -> CisLink:
+ result = await self.send_command(
+ HCI_LE_Accept_CIS_Request_Command(connection_handle=handle),
+ )
+ if result.status != HCI_COMMAND_STATUS_PENDING:
+ logger.warning(
+ 'HCI_LE_Accept_CIS_Request_Command failed: '
+ f'{HCI_Constant.error_name(result.status)}'
+ )
+ raise HCI_StatusError(result)
+
+ pending_cis_establishment = asyncio.get_running_loop().create_future()
+
+ with closing(EventWatcher()) as watcher:
+
+ @watcher.on(self, 'cis_establishment')
+ def on_cis_establishment(cis_link: CisLink) -> None:
+ if cis_link.handle == handle:
+ pending_cis_establishment.set_result(cis_link)
+
+ return await pending_cis_establishment
+
+ # [LE only]
+ @experimental('Only for testing.')
+ async def reject_cis_request(
+ self,
+ handle: int,
+ reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
+ ) -> None:
+ result = await self.send_command(
+ HCI_LE_Reject_CIS_Request_Command(connection_handle=handle, reason=reason),
+ )
+ if result.status != HCI_COMMAND_STATUS_PENDING:
+ logger.warning(
+ 'HCI_LE_Reject_CIS_Request_Command failed: '
+ f'{HCI_Constant.error_name(result.status)}'
+ )
+ raise HCI_StatusError(result)
+
+ async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask:
+ """[LE Only] Reads remote LE supported features.
+
+ Args:
+ handle: connection handle to read LE features.
+
+ Returns:
+ LE features supported by the remote device.
+ """
+ with closing(EventWatcher()) as watcher:
+ read_feature_future: asyncio.Future[
+ LeFeatureMask
+ ] = asyncio.get_running_loop().create_future()
+
+ def on_le_remote_features(handle: int, features: int):
+ if handle == connection.handle:
+ read_feature_future.set_result(LeFeatureMask(features))
+
+ watcher.on(self.host, 'le_remote_features', on_le_remote_features)
+ await self.send_command(
+ HCI_LE_Read_Remote_Features_Command(
+ connection_handle=connection.handle
+ ),
+ )
+ return await read_feature_future
+
@host_event_handler
def on_flush(self):
self.emit('flush')
@@ -2670,6 +3543,74 @@ class Device(CompositeEventEmitter):
await self.gatt_server.indicate_subscribers(attribute, value, force)
@host_event_handler
+ def on_advertising_set_termination(
+ self,
+ status,
+ advertising_handle,
+ connection_handle,
+ number_of_completed_extended_advertising_events,
+ ):
+ # Legacy advertising set is also one of extended advertising sets.
+ if not (
+ advertising_set := self.extended_advertising_sets.get(advertising_handle)
+ ):
+ logger.warning(f'advertising set {advertising_handle} not found')
+ return
+
+ advertising_set.on_termination(status)
+
+ if status != HCI_SUCCESS:
+ logger.debug(
+ f'advertising set {advertising_handle} '
+ f'terminated with status {status}'
+ )
+ return
+
+ if not (connection := self.lookup_connection(connection_handle)):
+ logger.warning(f'no connection for handle 0x{connection_handle:04x}')
+ return
+
+ # Update the connection address.
+ connection.self_address = (
+ advertising_set.random_address
+ if advertising_set.advertising_parameters.own_address_type
+ in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM)
+ else self.public_address
+ )
+
+ # Setup auto-restart of the advertising set if needed.
+ if advertising_set.auto_restart:
+ connection.once(
+ 'disconnection',
+ lambda _: self.abort_on('flush', advertising_set.start()),
+ )
+
+ self._emit_le_connection(connection)
+
+ def _emit_le_connection(self, connection: Connection) -> None:
+ # If supported, read which PHY we're connected with before
+ # notifying listeners of the new connection.
+ if self.host.supports_command(HCI_LE_READ_PHY_COMMAND):
+
+ async def read_phy():
+ result = await self.send_command(
+ HCI_LE_Read_PHY_Command(connection_handle=connection.handle),
+ check_result=True,
+ )
+ connection.phy = ConnectionPHY(
+ result.return_parameters.tx_phy, result.return_parameters.rx_phy
+ )
+ # Emit an event to notify listeners of the new connection
+ self.emit('connection', connection)
+
+ # Do so asynchronously to not block the current event handler
+ connection.abort_on('disconnection', read_phy())
+
+ return
+
+ self.emit('connection', connection)
+
+ @host_event_handler
def on_connection(
self,
connection_handle,
@@ -2687,8 +3628,6 @@ class Device(CompositeEventEmitter):
'new connection reuses the same handle as a previous connection'
)
- peer_resolvable_address = None
-
if transport == BT_BR_EDR_TRANSPORT:
# Create a new connection
connection = self.pending_connections.pop(peer_address)
@@ -2697,70 +3636,76 @@ class Device(CompositeEventEmitter):
# Emit an event to notify listeners of the new connection
self.emit('connection', connection)
- else:
- # Resolve the peer address if we can
- if self.address_resolver:
- if peer_address.is_resolvable:
- resolved_address = self.address_resolver.resolve(peer_address)
- if resolved_address is not None:
- logger.debug(f'*** Address resolved as {resolved_address}')
- peer_resolvable_address = peer_address
- peer_address = resolved_address
-
- # Guess which own address type is used for this connection.
- # This logic is somewhat correct but may need to be improved
- # when multiple advertising are run simultaneously.
- if self.connect_own_address_type is not None:
- own_address_type = self.connect_own_address_type
- else:
- own_address_type = self.advertising_own_address_type
- # We are no longer advertising
- self.advertising = False
+ return
- if own_address_type in (
- OwnAddressType.PUBLIC,
- OwnAddressType.RESOLVABLE_OR_PUBLIC,
- ):
- self_address = self.public_address
+ # Resolve the peer address if we can
+ peer_resolvable_address = None
+ if self.address_resolver:
+ if peer_address.is_resolvable:
+ resolved_address = self.address_resolver.resolve(peer_address)
+ if resolved_address is not None:
+ logger.debug(f'*** Address resolved as {resolved_address}')
+ peer_resolvable_address = peer_address
+ peer_address = resolved_address
+
+ self_address = None
+ if role == HCI_CENTRAL_ROLE:
+ own_address_type = self.connect_own_address_type
+ assert own_address_type is not None
+ else:
+ if self.supports_le_extended_advertising:
+ # We'll know the address when the advertising set terminates,
+ # Use a temporary placeholder value for self_address.
+ self_address = Address.ANY_RANDOM
else:
- self_address = self.random_address
-
- # Create a new connection
- connection = Connection(
- self,
- connection_handle,
- transport,
- self_address,
- peer_address,
- peer_resolvable_address,
- role,
- connection_parameters,
- ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY),
+ # We were connected via a legacy advertisement.
+ if self.legacy_advertiser:
+ own_address_type = self.legacy_advertiser.own_address_type
+ self.legacy_advertiser = None
+ else:
+ # This should not happen, but just in case, pick a default.
+ logger.warning("connection without an advertiser")
+ self_address = self.random_address
+
+ if self_address is None:
+ self_address = (
+ self.public_address
+ if own_address_type
+ in (
+ OwnAddressType.PUBLIC,
+ OwnAddressType.RESOLVABLE_OR_PUBLIC,
+ )
+ else self.random_address
)
- self.connections[connection_handle] = connection
-
- # If supported, read which PHY we're connected with before
- # notifying listeners of the new connection.
- if self.host.supports_command(HCI_LE_READ_PHY_COMMAND):
- async def read_phy():
- result = await self.send_command(
- HCI_LE_Read_PHY_Command(connection_handle=connection_handle),
- check_result=True,
- )
- connection.phy = ConnectionPHY(
- result.return_parameters.tx_phy, result.return_parameters.rx_phy
- )
- # Emit an event to notify listeners of the new connection
- self.emit('connection', connection)
+ # Create a connection.
+ connection = Connection(
+ self,
+ connection_handle,
+ transport,
+ self_address,
+ peer_address,
+ peer_resolvable_address,
+ role,
+ connection_parameters,
+ ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY),
+ )
+ self.connections[connection_handle] = connection
- # Do so asynchronously to not block the current event handler
- connection.abort_on('disconnection', read_phy())
+ if (
+ role == HCI_PERIPHERAL_ROLE
+ and self.legacy_advertiser
+ and self.legacy_advertiser.auto_restart
+ ):
+ connection.once(
+ 'disconnection',
+ lambda _: self.abort_on('flush', self.legacy_advertiser.start()),
+ )
- else:
- # Emit an event to notify listeners of the new connection
- self.emit('connection', connection)
+ if role == HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising:
+ # We can emit now, we have all the info we need
+ self._emit_le_connection(connection)
@host_event_handler
def on_connection_failure(self, transport, peer_address, error_code):
@@ -2769,10 +3714,10 @@ class Device(CompositeEventEmitter):
# For directed advertising, this means a timeout
if (
transport == BT_LE_TRANSPORT
- and self.advertising
- and self.advertising_type.is_directed
+ and self.legacy_advertiser
+ and self.legacy_advertiser.advertising_type.is_directed
):
- self.advertising = False
+ self.legacy_advertiser = None
# Notify listeners
error = core.ConnectionError(
@@ -2789,8 +3734,21 @@ class Device(CompositeEventEmitter):
def on_connection_request(self, bd_addr, class_of_device, link_type):
logger.debug(f'*** Connection request: {bd_addr}')
+ # Handle SCO request.
+ if link_type in (
+ HCI_Connection_Complete_Event.SCO_LINK_TYPE,
+ HCI_Connection_Complete_Event.ESCO_LINK_TYPE,
+ ):
+ if connection := self.find_connection_by_bd_addr(
+ bd_addr, transport=BT_BR_EDR_TRANSPORT
+ ):
+ self.emit('sco_request', connection, link_type)
+ else:
+ logger.error(f'SCO request from a non-connected device {bd_addr}')
+ return
+
# match a pending future using `bd_addr`
- if bd_addr in self.classic_pending_accepts:
+ elif bd_addr in self.classic_pending_accepts:
future, *_ = self.classic_pending_accepts.pop(bd_addr)
future.set_result((bd_addr, class_of_device, link_type))
@@ -2822,30 +3780,23 @@ class Device(CompositeEventEmitter):
)
@host_event_handler
- @with_connection_from_handle
- def on_disconnection(self, connection, reason):
- logger.debug(
- f'*** Disconnection: [0x{connection.handle:04X}] '
- f'{connection.peer_address} as {connection.role_name}, reason={reason}'
- )
- connection.emit('disconnection', reason)
-
- # Remove the connection from the map
- del self.connections[connection.handle]
-
- # Cleanup subsystems that maintain per-connection state
- self.gatt_server.on_disconnection(connection)
-
- # Restart advertising if auto-restart is enabled
- if self.auto_restart_advertising:
- logger.debug('restarting advertising')
- self.abort_on(
- 'flush',
- self.start_advertising(
- advertising_type=self.advertising_type,
- own_address_type=self.advertising_own_address_type,
- auto_restart=True,
- ),
+ def on_disconnection(self, connection_handle: int, reason: int) -> None:
+ if connection := self.connections.pop(connection_handle, None):
+ logger.debug(
+ f'*** Disconnection: [0x{connection.handle:04X}] '
+ f'{connection.peer_address} as {connection.role_name}, reason={reason}'
+ )
+ connection.emit('disconnection', reason)
+
+ # Cleanup subsystems that maintain per-connection state
+ self.gatt_server.on_disconnection(connection)
+ elif sco_link := self.sco_links.pop(connection_handle, None):
+ sco_link.emit('disconnection', reason)
+ elif cis_link := self.cis_links.pop(connection_handle, None):
+ cis_link.emit('disconnection', reason)
+ else:
+ logger.error(
+ f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***'
)
@host_event_handler
@@ -2996,7 +3947,7 @@ class Device(CompositeEventEmitter):
try:
if await connection.abort_on('disconnection', method()):
await self.host.send_command(
- HCI_User_Confirmation_Request_Reply_Command( # type: ignore[call-arg]
+ HCI_User_Confirmation_Request_Reply_Command(
bd_addr=connection.peer_address
)
)
@@ -3005,7 +3956,7 @@ class Device(CompositeEventEmitter):
logger.warning(f'exception while confirming: {error}')
await self.host.send_command(
- HCI_User_Confirmation_Request_Negative_Reply_Command( # type: ignore[call-arg]
+ HCI_User_Confirmation_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
)
)
@@ -3026,7 +3977,7 @@ class Device(CompositeEventEmitter):
)
if number is not None:
await self.host.send_command(
- HCI_User_Passkey_Request_Reply_Command( # type: ignore[call-arg]
+ HCI_User_Passkey_Request_Reply_Command(
bd_addr=connection.peer_address, numeric_value=number
)
)
@@ -3035,7 +3986,7 @@ class Device(CompositeEventEmitter):
logger.warning(f'exception while asking for pass-key: {error}')
await self.host.send_command(
- HCI_User_Passkey_Request_Negative_Reply_Command( # type: ignore[call-arg]
+ HCI_User_Passkey_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
)
)
@@ -3124,6 +4075,107 @@ class Device(CompositeEventEmitter):
connection.emit('remote_name_failure', error)
self.emit('remote_name_failure', address, error)
+ # [Classic only]
+ @host_event_handler
+ @with_connection_from_address
+ @experimental('Only for testing.')
+ def on_sco_connection(
+ self, acl_connection: Connection, sco_handle: int, link_type: int
+ ) -> None:
+ logger.debug(
+ f'*** SCO connected: {acl_connection.peer_address}, '
+ f'sco_handle=[0x{sco_handle:04X}], '
+ f'link_type=[0x{link_type:02X}] ***'
+ )
+ sco_link = self.sco_links[sco_handle] = ScoLink(
+ device=self,
+ acl_connection=acl_connection,
+ handle=sco_handle,
+ link_type=link_type,
+ )
+ self.emit('sco_connection', sco_link)
+
+ # [Classic only]
+ @host_event_handler
+ @with_connection_from_address
+ @experimental('Only for testing.')
+ def on_sco_connection_failure(
+ self, acl_connection: Connection, status: int
+ ) -> None:
+ logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***')
+ self.emit('sco_connection_failure')
+
+ # [Classic only]
+ @host_event_handler
+ @experimental('Only for testing')
+ def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None:
+ if sco_link := self.sco_links.get(sco_handle):
+ sco_link.emit('pdu', packet)
+
+ # [LE only]
+ @host_event_handler
+ @with_connection_from_handle
+ @experimental('Only for testing')
+ def on_cis_request(
+ self,
+ acl_connection: Connection,
+ cis_handle: int,
+ cig_id: int,
+ cis_id: int,
+ ) -> None:
+ logger.debug(
+ f'*** CIS Request '
+ f'acl_handle=[0x{acl_connection.handle:04X}]{acl_connection.peer_address}, '
+ f'cis_handle=[0x{cis_handle:04X}], '
+ f'cig_id=[0x{cig_id:02X}], '
+ f'cis_id=[0x{cis_id:02X}] ***'
+ )
+ # LE_CIS_Established event doesn't provide info, so we must store them here.
+ self.cis_links[cis_handle] = CisLink(
+ device=self,
+ acl_connection=acl_connection,
+ handle=cis_handle,
+ cig_id=cig_id,
+ cis_id=cis_id,
+ )
+ self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id)
+
+ # [LE only]
+ @host_event_handler
+ @experimental('Only for testing')
+ def on_cis_establishment(self, cis_handle: int) -> None:
+ cis_link = self.cis_links[cis_handle]
+ cis_link.state = CisLink.State.ESTABLISHED
+
+ assert cis_link.acl_connection
+
+ logger.debug(
+ f'*** CIS Establishment '
+ f'{cis_link.acl_connection.peer_address}, '
+ f'cis_handle=[0x{cis_handle:04X}], '
+ f'cig_id=[0x{cis_link.cig_id:02X}], '
+ f'cis_id=[0x{cis_link.cis_id:02X}] ***'
+ )
+
+ cis_link.emit('establishment')
+ self.emit('cis_establishment', cis_link)
+
+ # [LE only]
+ @host_event_handler
+ @experimental('Only for testing')
+ def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None:
+ logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***')
+ if cis_link := self.cis_links.pop(cis_handle):
+ cis_link.emit('establishment_failure')
+ self.emit('cis_establishment_failure', cis_handle, status)
+
+ # [LE only]
+ @host_event_handler
+ @experimental('Only for testing')
+ def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None:
+ if cis_link := self.cis_links.get(handle):
+ cis_link.emit('pdu', packet)
+
@host_event_handler
@with_connection_from_handle
def on_connection_encryption_change(self, connection, encryption):
@@ -3135,10 +4187,18 @@ class Device(CompositeEventEmitter):
connection.encryption = encryption
if (
not connection.authenticated
+ and connection.transport == BT_BR_EDR_TRANSPORT
and encryption == HCI_Encryption_Change_Event.AES_CCM
):
connection.authenticated = True
connection.sc = True
+ if (
+ not connection.authenticated
+ and connection.transport == BT_LE_TRANSPORT
+ and encryption == HCI_Encryption_Change_Event.E0_OR_AES_CCM
+ ):
+ connection.authenticated = True
+ connection.sc = True
connection.emit('connection_encryption_change')
@host_event_handler