diff options
Diffstat (limited to 'bumble/rfcomm.py')
-rw-r--r-- | bumble/rfcomm.py | 297 |
1 files changed, 170 insertions, 127 deletions
diff --git a/bumble/rfcomm.py b/bumble/rfcomm.py index 0176a78..02c18fa 100644 --- a/bumble/rfcomm.py +++ b/bumble/rfcomm.py @@ -15,15 +15,37 @@ # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- +from __future__ import annotations + import logging import asyncio +import enum +from typing import Callable, Dict, List, Optional, Tuple, Union, TYPE_CHECKING from pyee import EventEmitter -from typing import Optional, Tuple, Callable, Dict, Union from . import core, l2cap from .colors import color -from .core import BT_BR_EDR_TRANSPORT, InvalidStateError, ProtocolError +from .core import ( + UUID, + BT_RFCOMM_PROTOCOL_ID, + BT_BR_EDR_TRANSPORT, + BT_L2CAP_PROTOCOL_ID, + InvalidStateError, + ProtocolError, +) +from .sdp import ( + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + SDP_PUBLIC_BROWSE_ROOT, + DataElement, + ServiceAttribute, +) + +if TYPE_CHECKING: + from bumble.device import Device, Connection # ----------------------------------------------------------------------------- # Logging @@ -106,6 +128,50 @@ RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30 # ----------------------------------------------------------------------------- +def make_service_sdp_records( + service_record_handle: int, channel: int, uuid: Optional[UUID] = None +) -> List[ServiceAttribute]: + """ + Create SDP records for an RFComm service given a channel number and an + optional UUID. A Service Class Attribute is included only if the UUID is not None. + """ + records = [ + ServiceAttribute( + SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, + DataElement.unsigned_integer_32(service_record_handle), + ), + ServiceAttribute( + SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, + DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), + ), + ServiceAttribute( + SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, + DataElement.sequence( + [ + DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), + DataElement.sequence( + [ + DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), + DataElement.unsigned_integer_8(channel), + ] + ), + ] + ), + ), + ] + + if uuid: + records.append( + ServiceAttribute( + SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, + DataElement.sequence([DataElement.uuid(uuid)]), + ) + ) + + return records + + +# ----------------------------------------------------------------------------- def compute_fcs(buffer: bytes) -> int: result = 0xFF for byte in buffer: @@ -149,9 +215,9 @@ class RFCOMM_Frame: return RFCOMM_FRAME_TYPE_NAMES[self.type] @staticmethod - def parse_mcc(data) -> Tuple[int, int, bytes]: + def parse_mcc(data) -> Tuple[int, bool, bytes]: mcc_type = data[0] >> 2 - c_r = (data[0] >> 1) & 1 + c_r = bool((data[0] >> 1) & 1) length = data[1] if data[1] & 1: length >>= 1 @@ -192,7 +258,7 @@ class RFCOMM_Frame: ) @staticmethod - def from_bytes(data: bytes): + def from_bytes(data: bytes) -> RFCOMM_Frame: # Extract fields dlci = (data[0] >> 2) & 0x3F c_r = (data[0] >> 1) & 0x01 @@ -215,7 +281,7 @@ class RFCOMM_Frame: return frame - def __bytes__(self): + def __bytes__(self) -> bytes: return ( bytes([self.address, self.control]) + self.length @@ -223,7 +289,7 @@ class RFCOMM_Frame: + bytes([self.fcs]) ) - def __str__(self): + def __str__(self) -> str: return ( f'{color(self.type_name(), "yellow")}' f'(c/r={self.c_r},' @@ -253,7 +319,7 @@ class RFCOMM_MCC_PN: max_frame_size: int, max_retransmissions: int, window_size: int, - ): + ) -> None: self.dlci = dlci self.cl = cl self.priority = priority @@ -263,7 +329,7 @@ class RFCOMM_MCC_PN: self.window_size = window_size @staticmethod - def from_bytes(data: bytes): + def from_bytes(data: bytes) -> RFCOMM_MCC_PN: return RFCOMM_MCC_PN( dlci=data[0], cl=data[1], @@ -274,7 +340,7 @@ class RFCOMM_MCC_PN: window_size=data[7], ) - def __bytes__(self): + def __bytes__(self) -> bytes: return bytes( [ self.dlci & 0xFF, @@ -288,7 +354,7 @@ class RFCOMM_MCC_PN: ] ) - def __str__(self): + def __str__(self) -> str: return ( f'PN(dlci={self.dlci},' f'cl={self.cl},' @@ -309,7 +375,9 @@ class RFCOMM_MCC_MSC: ic: int dv: int - def __init__(self, dlci: int, fc: int, rtc: int, rtr: int, ic: int, dv: int): + def __init__( + self, dlci: int, fc: int, rtc: int, rtr: int, ic: int, dv: int + ) -> None: self.dlci = dlci self.fc = fc self.rtc = rtc @@ -318,7 +386,7 @@ class RFCOMM_MCC_MSC: self.dv = dv @staticmethod - def from_bytes(data: bytes): + def from_bytes(data: bytes) -> RFCOMM_MCC_MSC: return RFCOMM_MCC_MSC( dlci=data[0] >> 2, fc=data[1] >> 1 & 1, @@ -328,7 +396,7 @@ class RFCOMM_MCC_MSC: dv=data[1] >> 7 & 1, ) - def __bytes__(self): + def __bytes__(self) -> bytes: return bytes( [ (self.dlci << 2) | 3, @@ -341,7 +409,7 @@ class RFCOMM_MCC_MSC: ] ) - def __str__(self): + def __str__(self) -> str: return ( f'MSC(dlci={self.dlci},' f'fc={self.fc},' @@ -354,29 +422,24 @@ class RFCOMM_MCC_MSC: # ----------------------------------------------------------------------------- class DLC(EventEmitter): - # States - INIT = 0x00 - CONNECTING = 0x01 - CONNECTED = 0x02 - DISCONNECTING = 0x03 - DISCONNECTED = 0x04 - RESET = 0x05 - - STATE_NAMES = { - INIT: 'INIT', - CONNECTING: 'CONNECTING', - CONNECTED: 'CONNECTED', - DISCONNECTING: 'DISCONNECTING', - DISCONNECTED: 'DISCONNECTED', - RESET: 'RESET', - } + class State(enum.IntEnum): + INIT = 0x00 + CONNECTING = 0x01 + CONNECTED = 0x02 + DISCONNECTING = 0x03 + DISCONNECTED = 0x04 + RESET = 0x05 connection_result: Optional[asyncio.Future] sink: Optional[Callable[[bytes], None]] def __init__( - self, multiplexer, dlci: int, max_frame_size: int, initial_tx_credits: int - ): + self, + multiplexer: Multiplexer, + dlci: int, + max_frame_size: int, + initial_tx_credits: int, + ) -> None: super().__init__() self.multiplexer = multiplexer self.dlci = dlci @@ -384,9 +447,9 @@ class DLC(EventEmitter): self.rx_threshold = self.rx_credits // 2 self.tx_credits = initial_tx_credits self.tx_buffer = b'' - self.state = DLC.INIT + self.state = DLC.State.INIT self.role = multiplexer.role - self.c_r = 1 if self.role == Multiplexer.INITIATOR else 0 + self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0 self.sink = None self.connection_result = None @@ -396,14 +459,8 @@ class DLC(EventEmitter): max_frame_size, self.multiplexer.l2cap_channel.mtu - max_overhead ) - @staticmethod - def state_name(state: int) -> str: - return DLC.STATE_NAMES[state] - - def change_state(self, new_state: int) -> None: - logger.debug( - f'{self} state change -> {color(self.state_name(new_state), "magenta")}' - ) + def change_state(self, new_state: State) -> None: + logger.debug(f'{self} state change -> {color(new_state.name, "magenta")}') self.state = new_state def send_frame(self, frame: RFCOMM_Frame) -> None: @@ -413,8 +470,8 @@ class DLC(EventEmitter): handler = getattr(self, f'on_{frame.type_name()}_frame'.lower()) handler(frame) - def on_sabm_frame(self, _frame) -> None: - if self.state != DLC.CONNECTING: + def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None: + if self.state != DLC.State.CONNECTING: logger.warning( color('!!! received SABM when not in CONNECTING state', 'red') ) @@ -430,11 +487,11 @@ class DLC(EventEmitter): logger.debug(f'>>> MCC MSC Command: {msc}') self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) - self.change_state(DLC.CONNECTED) + self.change_state(DLC.State.CONNECTED) self.emit('open') - def on_ua_frame(self, _frame) -> None: - if self.state != DLC.CONNECTING: + def on_ua_frame(self, _frame: RFCOMM_Frame) -> None: + if self.state != DLC.State.CONNECTING: logger.warning( color('!!! received SABM when not in CONNECTING state', 'red') ) @@ -448,14 +505,14 @@ class DLC(EventEmitter): logger.debug(f'>>> MCC MSC Command: {msc}') self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) - self.change_state(DLC.CONNECTED) + self.change_state(DLC.State.CONNECTED) self.multiplexer.on_dlc_open_complete(self) - def on_dm_frame(self, frame) -> None: + def on_dm_frame(self, frame: RFCOMM_Frame) -> None: # TODO: handle all states pass - def on_disc_frame(self, _frame) -> None: + def on_disc_frame(self, _frame: RFCOMM_Frame) -> None: # TODO: handle all states self.send_frame(RFCOMM_Frame.ua(c_r=1 - self.c_r, dlci=self.dlci)) @@ -489,10 +546,10 @@ class DLC(EventEmitter): # Check if there's anything to send (including credits) self.process_tx() - def on_ui_frame(self, frame) -> None: + def on_ui_frame(self, frame: RFCOMM_Frame) -> None: pass - def on_mcc_msc(self, c_r, msc) -> None: + def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None: if c_r: # Command logger.debug(f'<<< MCC MSC Command: {msc}') @@ -507,15 +564,15 @@ class DLC(EventEmitter): logger.debug(f'<<< MCC MSC Response: {msc}') def connect(self) -> None: - if self.state != DLC.INIT: + if self.state != DLC.State.INIT: raise InvalidStateError('invalid state') - self.change_state(DLC.CONNECTING) + self.change_state(DLC.State.CONNECTING) self.connection_result = asyncio.get_running_loop().create_future() self.send_frame(RFCOMM_Frame.sabm(c_r=self.c_r, dlci=self.dlci)) def accept(self) -> None: - if self.state != DLC.INIT: + if self.state != DLC.State.INIT: raise InvalidStateError('invalid state') pn = RFCOMM_MCC_PN( @@ -530,7 +587,7 @@ class DLC(EventEmitter): mcc = RFCOMM_Frame.make_mcc(mcc_type=RFCOMM_MCC_PN_TYPE, c_r=0, data=bytes(pn)) logger.debug(f'>>> PN Response: {pn}') self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) - self.change_state(DLC.CONNECTING) + self.change_state(DLC.State.CONNECTING) def rx_credits_needed(self) -> int: if self.rx_credits <= self.rx_threshold: @@ -592,34 +649,24 @@ class DLC(EventEmitter): # TODO pass - def __str__(self): - return f'DLC(dlci={self.dlci},state={self.state_name(self.state)})' + def __str__(self) -> str: + return f'DLC(dlci={self.dlci},state={self.state.name})' # ----------------------------------------------------------------------------- class Multiplexer(EventEmitter): - # Roles - INITIATOR = 0x00 - RESPONDER = 0x01 - - # States - INIT = 0x00 - CONNECTING = 0x01 - CONNECTED = 0x02 - OPENING = 0x03 - DISCONNECTING = 0x04 - DISCONNECTED = 0x05 - RESET = 0x06 - - STATE_NAMES = { - INIT: 'INIT', - CONNECTING: 'CONNECTING', - CONNECTED: 'CONNECTED', - OPENING: 'OPENING', - DISCONNECTING: 'DISCONNECTING', - DISCONNECTED: 'DISCONNECTED', - RESET: 'RESET', - } + class Role(enum.IntEnum): + INITIATOR = 0x00 + RESPONDER = 0x01 + + class State(enum.IntEnum): + INIT = 0x00 + CONNECTING = 0x01 + CONNECTED = 0x02 + OPENING = 0x03 + DISCONNECTING = 0x04 + DISCONNECTED = 0x05 + RESET = 0x06 connection_result: Optional[asyncio.Future] disconnection_result: Optional[asyncio.Future] @@ -627,11 +674,11 @@ class Multiplexer(EventEmitter): acceptor: Optional[Callable[[int], bool]] dlcs: Dict[int, DLC] - def __init__(self, l2cap_channel: l2cap.Channel, role: int) -> None: + def __init__(self, l2cap_channel: l2cap.Channel, role: Role) -> None: super().__init__() self.role = role self.l2cap_channel = l2cap_channel - self.state = Multiplexer.INIT + self.state = Multiplexer.State.INIT self.dlcs = {} # DLCs, by DLCI self.connection_result = None self.disconnection_result = None @@ -641,14 +688,8 @@ class Multiplexer(EventEmitter): # Become a sink for the L2CAP channel l2cap_channel.sink = self.on_pdu - @staticmethod - def state_name(state: int): - return Multiplexer.STATE_NAMES[state] - - def change_state(self, new_state: int) -> None: - logger.debug( - f'{self} state change -> {color(self.state_name(new_state), "cyan")}' - ) + def change_state(self, new_state: State) -> None: + logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}') self.state = new_state def send_frame(self, frame: RFCOMM_Frame) -> None: @@ -679,28 +720,28 @@ class Multiplexer(EventEmitter): handler = getattr(self, f'on_{frame.type_name()}_frame'.lower()) handler(frame) - def on_sabm_frame(self, _frame) -> None: - if self.state != Multiplexer.INIT: + def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None: + if self.state != Multiplexer.State.INIT: logger.debug('not in INIT state, ignoring SABM') return - self.change_state(Multiplexer.CONNECTED) + self.change_state(Multiplexer.State.CONNECTED) self.send_frame(RFCOMM_Frame.ua(c_r=1, dlci=0)) - def on_ua_frame(self, _frame) -> None: - if self.state == Multiplexer.CONNECTING: - self.change_state(Multiplexer.CONNECTED) + def on_ua_frame(self, _frame: RFCOMM_Frame) -> None: + if self.state == Multiplexer.State.CONNECTING: + self.change_state(Multiplexer.State.CONNECTED) if self.connection_result: self.connection_result.set_result(0) self.connection_result = None - elif self.state == Multiplexer.DISCONNECTING: - self.change_state(Multiplexer.DISCONNECTED) + elif self.state == Multiplexer.State.DISCONNECTING: + self.change_state(Multiplexer.State.DISCONNECTED) if self.disconnection_result: self.disconnection_result.set_result(None) self.disconnection_result = None - def on_dm_frame(self, _frame) -> None: - if self.state == Multiplexer.OPENING: - self.change_state(Multiplexer.CONNECTED) + def on_dm_frame(self, _frame: RFCOMM_Frame) -> None: + if self.state == Multiplexer.State.OPENING: + self.change_state(Multiplexer.State.CONNECTED) if self.open_result: self.open_result.set_exception( core.ConnectionError( @@ -713,10 +754,12 @@ class Multiplexer(EventEmitter): else: logger.warning(f'unexpected state for DM: {self}') - def on_disc_frame(self, _frame) -> None: - self.change_state(Multiplexer.DISCONNECTED) + def on_disc_frame(self, _frame: RFCOMM_Frame) -> None: + self.change_state(Multiplexer.State.DISCONNECTED) self.send_frame( - RFCOMM_Frame.ua(c_r=0 if self.role == Multiplexer.INITIATOR else 1, dlci=0) + RFCOMM_Frame.ua( + c_r=0 if self.role == Multiplexer.Role.INITIATOR else 1, dlci=0 + ) ) def on_uih_frame(self, frame: RFCOMM_Frame) -> None: @@ -729,11 +772,11 @@ class Multiplexer(EventEmitter): mcs = RFCOMM_MCC_MSC.from_bytes(value) self.on_mcc_msc(c_r, mcs) - def on_ui_frame(self, frame) -> None: + def on_ui_frame(self, frame: RFCOMM_Frame) -> None: pass - def on_mcc_pn(self, c_r, pn) -> None: - if c_r == 1: + def on_mcc_pn(self, c_r: bool, pn: RFCOMM_MCC_PN) -> None: + if c_r: # Command logger.debug(f'<<< PN Command: {pn}') @@ -764,14 +807,14 @@ class Multiplexer(EventEmitter): else: # Response logger.debug(f'>>> PN Response: {pn}') - if self.state == Multiplexer.OPENING: + if self.state == Multiplexer.State.OPENING: dlc = DLC(self, pn.dlci, pn.max_frame_size, pn.window_size) self.dlcs[pn.dlci] = dlc dlc.connect() else: logger.warning('ignoring PN response') - def on_mcc_msc(self, c_r, msc) -> None: + def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None: dlc = self.dlcs.get(msc.dlci) if dlc is None: logger.warning(f'no dlc for DLCI {msc.dlci}') @@ -779,30 +822,30 @@ class Multiplexer(EventEmitter): dlc.on_mcc_msc(c_r, msc) async def connect(self) -> None: - if self.state != Multiplexer.INIT: + if self.state != Multiplexer.State.INIT: raise InvalidStateError('invalid state') - self.change_state(Multiplexer.CONNECTING) + self.change_state(Multiplexer.State.CONNECTING) self.connection_result = asyncio.get_running_loop().create_future() self.send_frame(RFCOMM_Frame.sabm(c_r=1, dlci=0)) return await self.connection_result async def disconnect(self) -> None: - if self.state != Multiplexer.CONNECTED: + if self.state != Multiplexer.State.CONNECTED: return self.disconnection_result = asyncio.get_running_loop().create_future() - self.change_state(Multiplexer.DISCONNECTING) + self.change_state(Multiplexer.State.DISCONNECTING) self.send_frame( RFCOMM_Frame.disc( - c_r=1 if self.role == Multiplexer.INITIATOR else 0, dlci=0 + c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=0 ) ) await self.disconnection_result async def open_dlc(self, channel: int) -> DLC: - if self.state != Multiplexer.CONNECTED: - if self.state == Multiplexer.OPENING: + if self.state != Multiplexer.State.CONNECTED: + if self.state == Multiplexer.State.OPENING: raise InvalidStateError('open already in progress') raise InvalidStateError('not connected') @@ -819,10 +862,10 @@ class Multiplexer(EventEmitter): mcc = RFCOMM_Frame.make_mcc(mcc_type=RFCOMM_MCC_PN_TYPE, c_r=1, data=bytes(pn)) logger.debug(f'>>> Sending MCC: {pn}') self.open_result = asyncio.get_running_loop().create_future() - self.change_state(Multiplexer.OPENING) + self.change_state(Multiplexer.State.OPENING) self.send_frame( RFCOMM_Frame.uih( - c_r=1 if self.role == Multiplexer.INITIATOR else 0, + c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=0, information=mcc, ) @@ -831,14 +874,14 @@ class Multiplexer(EventEmitter): self.open_result = None return result - def on_dlc_open_complete(self, dlc: DLC): + def on_dlc_open_complete(self, dlc: DLC) -> None: logger.debug(f'DLC [{dlc.dlci}] open complete') - self.change_state(Multiplexer.CONNECTED) + self.change_state(Multiplexer.State.CONNECTED) if self.open_result: self.open_result.set_result(dlc) - def __str__(self): - return f'Multiplexer(state={self.state_name(self.state)})' + def __str__(self) -> str: + return f'Multiplexer(state={self.state.name})' # ----------------------------------------------------------------------------- @@ -846,7 +889,7 @@ class Client: multiplexer: Optional[Multiplexer] l2cap_channel: Optional[l2cap.Channel] - def __init__(self, device, connection) -> None: + def __init__(self, device: Device, connection: Connection) -> None: self.device = device self.connection = connection self.l2cap_channel = None @@ -864,7 +907,7 @@ class Client: assert self.l2cap_channel is not None # Create a mutliplexer to manage DLCs with the server - self.multiplexer = Multiplexer(self.l2cap_channel, Multiplexer.INITIATOR) + self.multiplexer = Multiplexer(self.l2cap_channel, Multiplexer.Role.INITIATOR) # Connect the multiplexer await self.multiplexer.connect() @@ -886,7 +929,7 @@ class Client: class Server(EventEmitter): acceptors: Dict[int, Callable[[DLC], None]] - def __init__(self, device) -> None: + def __init__(self, device: Device) -> None: super().__init__() self.device = device self.multiplexer = None @@ -925,7 +968,7 @@ class Server(EventEmitter): logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}') # Create a new multiplexer for the channel - multiplexer = Multiplexer(l2cap_channel, Multiplexer.RESPONDER) + multiplexer = Multiplexer(l2cap_channel, Multiplexer.Role.RESPONDER) multiplexer.acceptor = self.accept_dlc multiplexer.on('dlc', self.on_dlc) |