aboutsummaryrefslogtreecommitdiff
path: root/bumble/rfcomm.py
diff options
context:
space:
mode:
Diffstat (limited to 'bumble/rfcomm.py')
-rw-r--r--bumble/rfcomm.py297
1 files changed, 170 insertions, 127 deletions
diff --git a/bumble/rfcomm.py b/bumble/rfcomm.py
index 0176a78..02c18fa 100644
--- a/bumble/rfcomm.py
+++ b/bumble/rfcomm.py
@@ -15,15 +15,37 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
+from __future__ import annotations
+
import logging
import asyncio
+import enum
+from typing import Callable, Dict, List, Optional, Tuple, Union, TYPE_CHECKING
from pyee import EventEmitter
-from typing import Optional, Tuple, Callable, Dict, Union
from . import core, l2cap
from .colors import color
-from .core import BT_BR_EDR_TRANSPORT, InvalidStateError, ProtocolError
+from .core import (
+ UUID,
+ BT_RFCOMM_PROTOCOL_ID,
+ BT_BR_EDR_TRANSPORT,
+ BT_L2CAP_PROTOCOL_ID,
+ InvalidStateError,
+ ProtocolError,
+)
+from .sdp import (
+ SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
+ SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
+ SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
+ SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
+ SDP_PUBLIC_BROWSE_ROOT,
+ DataElement,
+ ServiceAttribute,
+)
+
+if TYPE_CHECKING:
+ from bumble.device import Device, Connection
# -----------------------------------------------------------------------------
# Logging
@@ -106,6 +128,50 @@ RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30
# -----------------------------------------------------------------------------
+def make_service_sdp_records(
+ service_record_handle: int, channel: int, uuid: Optional[UUID] = None
+) -> List[ServiceAttribute]:
+ """
+ Create SDP records for an RFComm service given a channel number and an
+ optional UUID. A Service Class Attribute is included only if the UUID is not None.
+ """
+ records = [
+ ServiceAttribute(
+ SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
+ DataElement.unsigned_integer_32(service_record_handle),
+ ),
+ ServiceAttribute(
+ SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
+ DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
+ ),
+ ServiceAttribute(
+ SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
+ DataElement.sequence(
+ [
+ DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
+ DataElement.sequence(
+ [
+ DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
+ DataElement.unsigned_integer_8(channel),
+ ]
+ ),
+ ]
+ ),
+ ),
+ ]
+
+ if uuid:
+ records.append(
+ ServiceAttribute(
+ SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
+ DataElement.sequence([DataElement.uuid(uuid)]),
+ )
+ )
+
+ return records
+
+
+# -----------------------------------------------------------------------------
def compute_fcs(buffer: bytes) -> int:
result = 0xFF
for byte in buffer:
@@ -149,9 +215,9 @@ class RFCOMM_Frame:
return RFCOMM_FRAME_TYPE_NAMES[self.type]
@staticmethod
- def parse_mcc(data) -> Tuple[int, int, bytes]:
+ def parse_mcc(data) -> Tuple[int, bool, bytes]:
mcc_type = data[0] >> 2
- c_r = (data[0] >> 1) & 1
+ c_r = bool((data[0] >> 1) & 1)
length = data[1]
if data[1] & 1:
length >>= 1
@@ -192,7 +258,7 @@ class RFCOMM_Frame:
)
@staticmethod
- def from_bytes(data: bytes):
+ def from_bytes(data: bytes) -> RFCOMM_Frame:
# Extract fields
dlci = (data[0] >> 2) & 0x3F
c_r = (data[0] >> 1) & 0x01
@@ -215,7 +281,7 @@ class RFCOMM_Frame:
return frame
- def __bytes__(self):
+ def __bytes__(self) -> bytes:
return (
bytes([self.address, self.control])
+ self.length
@@ -223,7 +289,7 @@ class RFCOMM_Frame:
+ bytes([self.fcs])
)
- def __str__(self):
+ def __str__(self) -> str:
return (
f'{color(self.type_name(), "yellow")}'
f'(c/r={self.c_r},'
@@ -253,7 +319,7 @@ class RFCOMM_MCC_PN:
max_frame_size: int,
max_retransmissions: int,
window_size: int,
- ):
+ ) -> None:
self.dlci = dlci
self.cl = cl
self.priority = priority
@@ -263,7 +329,7 @@ class RFCOMM_MCC_PN:
self.window_size = window_size
@staticmethod
- def from_bytes(data: bytes):
+ def from_bytes(data: bytes) -> RFCOMM_MCC_PN:
return RFCOMM_MCC_PN(
dlci=data[0],
cl=data[1],
@@ -274,7 +340,7 @@ class RFCOMM_MCC_PN:
window_size=data[7],
)
- def __bytes__(self):
+ def __bytes__(self) -> bytes:
return bytes(
[
self.dlci & 0xFF,
@@ -288,7 +354,7 @@ class RFCOMM_MCC_PN:
]
)
- def __str__(self):
+ def __str__(self) -> str:
return (
f'PN(dlci={self.dlci},'
f'cl={self.cl},'
@@ -309,7 +375,9 @@ class RFCOMM_MCC_MSC:
ic: int
dv: int
- def __init__(self, dlci: int, fc: int, rtc: int, rtr: int, ic: int, dv: int):
+ def __init__(
+ self, dlci: int, fc: int, rtc: int, rtr: int, ic: int, dv: int
+ ) -> None:
self.dlci = dlci
self.fc = fc
self.rtc = rtc
@@ -318,7 +386,7 @@ class RFCOMM_MCC_MSC:
self.dv = dv
@staticmethod
- def from_bytes(data: bytes):
+ def from_bytes(data: bytes) -> RFCOMM_MCC_MSC:
return RFCOMM_MCC_MSC(
dlci=data[0] >> 2,
fc=data[1] >> 1 & 1,
@@ -328,7 +396,7 @@ class RFCOMM_MCC_MSC:
dv=data[1] >> 7 & 1,
)
- def __bytes__(self):
+ def __bytes__(self) -> bytes:
return bytes(
[
(self.dlci << 2) | 3,
@@ -341,7 +409,7 @@ class RFCOMM_MCC_MSC:
]
)
- def __str__(self):
+ def __str__(self) -> str:
return (
f'MSC(dlci={self.dlci},'
f'fc={self.fc},'
@@ -354,29 +422,24 @@ class RFCOMM_MCC_MSC:
# -----------------------------------------------------------------------------
class DLC(EventEmitter):
- # States
- INIT = 0x00
- CONNECTING = 0x01
- CONNECTED = 0x02
- DISCONNECTING = 0x03
- DISCONNECTED = 0x04
- RESET = 0x05
-
- STATE_NAMES = {
- INIT: 'INIT',
- CONNECTING: 'CONNECTING',
- CONNECTED: 'CONNECTED',
- DISCONNECTING: 'DISCONNECTING',
- DISCONNECTED: 'DISCONNECTED',
- RESET: 'RESET',
- }
+ class State(enum.IntEnum):
+ INIT = 0x00
+ CONNECTING = 0x01
+ CONNECTED = 0x02
+ DISCONNECTING = 0x03
+ DISCONNECTED = 0x04
+ RESET = 0x05
connection_result: Optional[asyncio.Future]
sink: Optional[Callable[[bytes], None]]
def __init__(
- self, multiplexer, dlci: int, max_frame_size: int, initial_tx_credits: int
- ):
+ self,
+ multiplexer: Multiplexer,
+ dlci: int,
+ max_frame_size: int,
+ initial_tx_credits: int,
+ ) -> None:
super().__init__()
self.multiplexer = multiplexer
self.dlci = dlci
@@ -384,9 +447,9 @@ class DLC(EventEmitter):
self.rx_threshold = self.rx_credits // 2
self.tx_credits = initial_tx_credits
self.tx_buffer = b''
- self.state = DLC.INIT
+ self.state = DLC.State.INIT
self.role = multiplexer.role
- self.c_r = 1 if self.role == Multiplexer.INITIATOR else 0
+ self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0
self.sink = None
self.connection_result = None
@@ -396,14 +459,8 @@ class DLC(EventEmitter):
max_frame_size, self.multiplexer.l2cap_channel.mtu - max_overhead
)
- @staticmethod
- def state_name(state: int) -> str:
- return DLC.STATE_NAMES[state]
-
- def change_state(self, new_state: int) -> None:
- logger.debug(
- f'{self} state change -> {color(self.state_name(new_state), "magenta")}'
- )
+ def change_state(self, new_state: State) -> None:
+ logger.debug(f'{self} state change -> {color(new_state.name, "magenta")}')
self.state = new_state
def send_frame(self, frame: RFCOMM_Frame) -> None:
@@ -413,8 +470,8 @@ class DLC(EventEmitter):
handler = getattr(self, f'on_{frame.type_name()}_frame'.lower())
handler(frame)
- def on_sabm_frame(self, _frame) -> None:
- if self.state != DLC.CONNECTING:
+ def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None:
+ if self.state != DLC.State.CONNECTING:
logger.warning(
color('!!! received SABM when not in CONNECTING state', 'red')
)
@@ -430,11 +487,11 @@ class DLC(EventEmitter):
logger.debug(f'>>> MCC MSC Command: {msc}')
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
- self.change_state(DLC.CONNECTED)
+ self.change_state(DLC.State.CONNECTED)
self.emit('open')
- def on_ua_frame(self, _frame) -> None:
- if self.state != DLC.CONNECTING:
+ def on_ua_frame(self, _frame: RFCOMM_Frame) -> None:
+ if self.state != DLC.State.CONNECTING:
logger.warning(
color('!!! received SABM when not in CONNECTING state', 'red')
)
@@ -448,14 +505,14 @@ class DLC(EventEmitter):
logger.debug(f'>>> MCC MSC Command: {msc}')
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
- self.change_state(DLC.CONNECTED)
+ self.change_state(DLC.State.CONNECTED)
self.multiplexer.on_dlc_open_complete(self)
- def on_dm_frame(self, frame) -> None:
+ def on_dm_frame(self, frame: RFCOMM_Frame) -> None:
# TODO: handle all states
pass
- def on_disc_frame(self, _frame) -> None:
+ def on_disc_frame(self, _frame: RFCOMM_Frame) -> None:
# TODO: handle all states
self.send_frame(RFCOMM_Frame.ua(c_r=1 - self.c_r, dlci=self.dlci))
@@ -489,10 +546,10 @@ class DLC(EventEmitter):
# Check if there's anything to send (including credits)
self.process_tx()
- def on_ui_frame(self, frame) -> None:
+ def on_ui_frame(self, frame: RFCOMM_Frame) -> None:
pass
- def on_mcc_msc(self, c_r, msc) -> None:
+ def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None:
if c_r:
# Command
logger.debug(f'<<< MCC MSC Command: {msc}')
@@ -507,15 +564,15 @@ class DLC(EventEmitter):
logger.debug(f'<<< MCC MSC Response: {msc}')
def connect(self) -> None:
- if self.state != DLC.INIT:
+ if self.state != DLC.State.INIT:
raise InvalidStateError('invalid state')
- self.change_state(DLC.CONNECTING)
+ self.change_state(DLC.State.CONNECTING)
self.connection_result = asyncio.get_running_loop().create_future()
self.send_frame(RFCOMM_Frame.sabm(c_r=self.c_r, dlci=self.dlci))
def accept(self) -> None:
- if self.state != DLC.INIT:
+ if self.state != DLC.State.INIT:
raise InvalidStateError('invalid state')
pn = RFCOMM_MCC_PN(
@@ -530,7 +587,7 @@ class DLC(EventEmitter):
mcc = RFCOMM_Frame.make_mcc(mcc_type=RFCOMM_MCC_PN_TYPE, c_r=0, data=bytes(pn))
logger.debug(f'>>> PN Response: {pn}')
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
- self.change_state(DLC.CONNECTING)
+ self.change_state(DLC.State.CONNECTING)
def rx_credits_needed(self) -> int:
if self.rx_credits <= self.rx_threshold:
@@ -592,34 +649,24 @@ class DLC(EventEmitter):
# TODO
pass
- def __str__(self):
- return f'DLC(dlci={self.dlci},state={self.state_name(self.state)})'
+ def __str__(self) -> str:
+ return f'DLC(dlci={self.dlci},state={self.state.name})'
# -----------------------------------------------------------------------------
class Multiplexer(EventEmitter):
- # Roles
- INITIATOR = 0x00
- RESPONDER = 0x01
-
- # States
- INIT = 0x00
- CONNECTING = 0x01
- CONNECTED = 0x02
- OPENING = 0x03
- DISCONNECTING = 0x04
- DISCONNECTED = 0x05
- RESET = 0x06
-
- STATE_NAMES = {
- INIT: 'INIT',
- CONNECTING: 'CONNECTING',
- CONNECTED: 'CONNECTED',
- OPENING: 'OPENING',
- DISCONNECTING: 'DISCONNECTING',
- DISCONNECTED: 'DISCONNECTED',
- RESET: 'RESET',
- }
+ class Role(enum.IntEnum):
+ INITIATOR = 0x00
+ RESPONDER = 0x01
+
+ class State(enum.IntEnum):
+ INIT = 0x00
+ CONNECTING = 0x01
+ CONNECTED = 0x02
+ OPENING = 0x03
+ DISCONNECTING = 0x04
+ DISCONNECTED = 0x05
+ RESET = 0x06
connection_result: Optional[asyncio.Future]
disconnection_result: Optional[asyncio.Future]
@@ -627,11 +674,11 @@ class Multiplexer(EventEmitter):
acceptor: Optional[Callable[[int], bool]]
dlcs: Dict[int, DLC]
- def __init__(self, l2cap_channel: l2cap.Channel, role: int) -> None:
+ def __init__(self, l2cap_channel: l2cap.Channel, role: Role) -> None:
super().__init__()
self.role = role
self.l2cap_channel = l2cap_channel
- self.state = Multiplexer.INIT
+ self.state = Multiplexer.State.INIT
self.dlcs = {} # DLCs, by DLCI
self.connection_result = None
self.disconnection_result = None
@@ -641,14 +688,8 @@ class Multiplexer(EventEmitter):
# Become a sink for the L2CAP channel
l2cap_channel.sink = self.on_pdu
- @staticmethod
- def state_name(state: int):
- return Multiplexer.STATE_NAMES[state]
-
- def change_state(self, new_state: int) -> None:
- logger.debug(
- f'{self} state change -> {color(self.state_name(new_state), "cyan")}'
- )
+ def change_state(self, new_state: State) -> None:
+ logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}')
self.state = new_state
def send_frame(self, frame: RFCOMM_Frame) -> None:
@@ -679,28 +720,28 @@ class Multiplexer(EventEmitter):
handler = getattr(self, f'on_{frame.type_name()}_frame'.lower())
handler(frame)
- def on_sabm_frame(self, _frame) -> None:
- if self.state != Multiplexer.INIT:
+ def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None:
+ if self.state != Multiplexer.State.INIT:
logger.debug('not in INIT state, ignoring SABM')
return
- self.change_state(Multiplexer.CONNECTED)
+ self.change_state(Multiplexer.State.CONNECTED)
self.send_frame(RFCOMM_Frame.ua(c_r=1, dlci=0))
- def on_ua_frame(self, _frame) -> None:
- if self.state == Multiplexer.CONNECTING:
- self.change_state(Multiplexer.CONNECTED)
+ def on_ua_frame(self, _frame: RFCOMM_Frame) -> None:
+ if self.state == Multiplexer.State.CONNECTING:
+ self.change_state(Multiplexer.State.CONNECTED)
if self.connection_result:
self.connection_result.set_result(0)
self.connection_result = None
- elif self.state == Multiplexer.DISCONNECTING:
- self.change_state(Multiplexer.DISCONNECTED)
+ elif self.state == Multiplexer.State.DISCONNECTING:
+ self.change_state(Multiplexer.State.DISCONNECTED)
if self.disconnection_result:
self.disconnection_result.set_result(None)
self.disconnection_result = None
- def on_dm_frame(self, _frame) -> None:
- if self.state == Multiplexer.OPENING:
- self.change_state(Multiplexer.CONNECTED)
+ def on_dm_frame(self, _frame: RFCOMM_Frame) -> None:
+ if self.state == Multiplexer.State.OPENING:
+ self.change_state(Multiplexer.State.CONNECTED)
if self.open_result:
self.open_result.set_exception(
core.ConnectionError(
@@ -713,10 +754,12 @@ class Multiplexer(EventEmitter):
else:
logger.warning(f'unexpected state for DM: {self}')
- def on_disc_frame(self, _frame) -> None:
- self.change_state(Multiplexer.DISCONNECTED)
+ def on_disc_frame(self, _frame: RFCOMM_Frame) -> None:
+ self.change_state(Multiplexer.State.DISCONNECTED)
self.send_frame(
- RFCOMM_Frame.ua(c_r=0 if self.role == Multiplexer.INITIATOR else 1, dlci=0)
+ RFCOMM_Frame.ua(
+ c_r=0 if self.role == Multiplexer.Role.INITIATOR else 1, dlci=0
+ )
)
def on_uih_frame(self, frame: RFCOMM_Frame) -> None:
@@ -729,11 +772,11 @@ class Multiplexer(EventEmitter):
mcs = RFCOMM_MCC_MSC.from_bytes(value)
self.on_mcc_msc(c_r, mcs)
- def on_ui_frame(self, frame) -> None:
+ def on_ui_frame(self, frame: RFCOMM_Frame) -> None:
pass
- def on_mcc_pn(self, c_r, pn) -> None:
- if c_r == 1:
+ def on_mcc_pn(self, c_r: bool, pn: RFCOMM_MCC_PN) -> None:
+ if c_r:
# Command
logger.debug(f'<<< PN Command: {pn}')
@@ -764,14 +807,14 @@ class Multiplexer(EventEmitter):
else:
# Response
logger.debug(f'>>> PN Response: {pn}')
- if self.state == Multiplexer.OPENING:
+ if self.state == Multiplexer.State.OPENING:
dlc = DLC(self, pn.dlci, pn.max_frame_size, pn.window_size)
self.dlcs[pn.dlci] = dlc
dlc.connect()
else:
logger.warning('ignoring PN response')
- def on_mcc_msc(self, c_r, msc) -> None:
+ def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None:
dlc = self.dlcs.get(msc.dlci)
if dlc is None:
logger.warning(f'no dlc for DLCI {msc.dlci}')
@@ -779,30 +822,30 @@ class Multiplexer(EventEmitter):
dlc.on_mcc_msc(c_r, msc)
async def connect(self) -> None:
- if self.state != Multiplexer.INIT:
+ if self.state != Multiplexer.State.INIT:
raise InvalidStateError('invalid state')
- self.change_state(Multiplexer.CONNECTING)
+ self.change_state(Multiplexer.State.CONNECTING)
self.connection_result = asyncio.get_running_loop().create_future()
self.send_frame(RFCOMM_Frame.sabm(c_r=1, dlci=0))
return await self.connection_result
async def disconnect(self) -> None:
- if self.state != Multiplexer.CONNECTED:
+ if self.state != Multiplexer.State.CONNECTED:
return
self.disconnection_result = asyncio.get_running_loop().create_future()
- self.change_state(Multiplexer.DISCONNECTING)
+ self.change_state(Multiplexer.State.DISCONNECTING)
self.send_frame(
RFCOMM_Frame.disc(
- c_r=1 if self.role == Multiplexer.INITIATOR else 0, dlci=0
+ c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=0
)
)
await self.disconnection_result
async def open_dlc(self, channel: int) -> DLC:
- if self.state != Multiplexer.CONNECTED:
- if self.state == Multiplexer.OPENING:
+ if self.state != Multiplexer.State.CONNECTED:
+ if self.state == Multiplexer.State.OPENING:
raise InvalidStateError('open already in progress')
raise InvalidStateError('not connected')
@@ -819,10 +862,10 @@ class Multiplexer(EventEmitter):
mcc = RFCOMM_Frame.make_mcc(mcc_type=RFCOMM_MCC_PN_TYPE, c_r=1, data=bytes(pn))
logger.debug(f'>>> Sending MCC: {pn}')
self.open_result = asyncio.get_running_loop().create_future()
- self.change_state(Multiplexer.OPENING)
+ self.change_state(Multiplexer.State.OPENING)
self.send_frame(
RFCOMM_Frame.uih(
- c_r=1 if self.role == Multiplexer.INITIATOR else 0,
+ c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0,
dlci=0,
information=mcc,
)
@@ -831,14 +874,14 @@ class Multiplexer(EventEmitter):
self.open_result = None
return result
- def on_dlc_open_complete(self, dlc: DLC):
+ def on_dlc_open_complete(self, dlc: DLC) -> None:
logger.debug(f'DLC [{dlc.dlci}] open complete')
- self.change_state(Multiplexer.CONNECTED)
+ self.change_state(Multiplexer.State.CONNECTED)
if self.open_result:
self.open_result.set_result(dlc)
- def __str__(self):
- return f'Multiplexer(state={self.state_name(self.state)})'
+ def __str__(self) -> str:
+ return f'Multiplexer(state={self.state.name})'
# -----------------------------------------------------------------------------
@@ -846,7 +889,7 @@ class Client:
multiplexer: Optional[Multiplexer]
l2cap_channel: Optional[l2cap.Channel]
- def __init__(self, device, connection) -> None:
+ def __init__(self, device: Device, connection: Connection) -> None:
self.device = device
self.connection = connection
self.l2cap_channel = None
@@ -864,7 +907,7 @@ class Client:
assert self.l2cap_channel is not None
# Create a mutliplexer to manage DLCs with the server
- self.multiplexer = Multiplexer(self.l2cap_channel, Multiplexer.INITIATOR)
+ self.multiplexer = Multiplexer(self.l2cap_channel, Multiplexer.Role.INITIATOR)
# Connect the multiplexer
await self.multiplexer.connect()
@@ -886,7 +929,7 @@ class Client:
class Server(EventEmitter):
acceptors: Dict[int, Callable[[DLC], None]]
- def __init__(self, device) -> None:
+ def __init__(self, device: Device) -> None:
super().__init__()
self.device = device
self.multiplexer = None
@@ -925,7 +968,7 @@ class Server(EventEmitter):
logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}')
# Create a new multiplexer for the channel
- multiplexer = Multiplexer(l2cap_channel, Multiplexer.RESPONDER)
+ multiplexer = Multiplexer(l2cap_channel, Multiplexer.Role.RESPONDER)
multiplexer.acceptor = self.accept_dlc
multiplexer.on('dlc', self.on_dlc)