aboutsummaryrefslogtreecommitdiff
path: root/examples/run_hfp_handsfree.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/run_hfp_handsfree.py')
-rw-r--r--examples/run_hfp_handsfree.py115
1 files changed, 35 insertions, 80 deletions
diff --git a/examples/run_hfp_handsfree.py b/examples/run_hfp_handsfree.py
index cef29c0..5f747fc 100644
--- a/examples/run_hfp_handsfree.py
+++ b/examples/run_hfp_handsfree.py
@@ -21,82 +21,22 @@ import os
import logging
import json
import websockets
-
+from typing import Optional
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.rfcomm import Server as RfcommServer
-from bumble.sdp import (
- DataElement,
- ServiceAttribute,
- SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
- SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
- SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
- SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
-)
-from bumble.core import (
- BT_GENERIC_AUDIO_SERVICE,
- BT_HANDSFREE_SERVICE,
- BT_L2CAP_PROTOCOL_ID,
- BT_RFCOMM_PROTOCOL_ID,
-)
-from bumble.hfp import HfpProtocol
-
-
-# -----------------------------------------------------------------------------
-def make_sdp_records(rfcomm_channel):
- return {
- 0x00010001: [
- ServiceAttribute(
- SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
- DataElement.unsigned_integer_32(0x00010001),
- ),
- ServiceAttribute(
- SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
- DataElement.sequence(
- [
- DataElement.uuid(BT_HANDSFREE_SERVICE),
- DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
- ]
- ),
- ),
- ServiceAttribute(
- SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
- DataElement.sequence(
- [
- DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
- DataElement.sequence(
- [
- DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
- DataElement.unsigned_integer_8(rfcomm_channel),
- ]
- ),
- ]
- ),
- ),
- ServiceAttribute(
- SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
- DataElement.sequence(
- [
- DataElement.sequence(
- [
- DataElement.uuid(BT_HANDSFREE_SERVICE),
- DataElement.unsigned_integer_16(0x0105),
- ]
- )
- ]
- ),
- ),
- ]
- }
+from bumble import hfp
+from bumble.hfp import HfProtocol
# -----------------------------------------------------------------------------
class UiServer:
- protocol = None
+ protocol: Optional[HfProtocol] = None
async def start(self):
- # Start a Websocket server to receive events from a web page
+ """Start a Websocket server to receive events from a web page."""
+
async def serve(websocket, _path):
while True:
try:
@@ -107,7 +47,7 @@ class UiServer:
message_type = parsed['type']
if message_type == 'at_command':
if self.protocol is not None:
- self.protocol.send_command_line(parsed['command'])
+ await self.protocol.execute_command(parsed['command'])
except websockets.exceptions.ConnectionClosedOK:
pass
@@ -117,19 +57,11 @@ class UiServer:
# -----------------------------------------------------------------------------
-async def protocol_loop(protocol):
- await protocol.initialize_service()
-
- while True:
- await (protocol.next_line())
-
-
-# -----------------------------------------------------------------------------
-def on_dlc(dlc):
+def on_dlc(dlc, configuration: hfp.Configuration):
print('*** DLC connected', dlc)
- protocol = HfpProtocol(dlc)
+ protocol = HfProtocol(dlc, configuration)
UiServer.protocol = protocol
- asyncio.create_task(protocol_loop(protocol))
+ asyncio.create_task(protocol.run())
# -----------------------------------------------------------------------------
@@ -143,6 +75,27 @@ async def main():
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected')
+ # Hands-Free profile configuration.
+ # TODO: load configuration from file.
+ configuration = hfp.Configuration(
+ supported_hf_features=[
+ hfp.HfFeature.THREE_WAY_CALLING,
+ hfp.HfFeature.REMOTE_VOLUME_CONTROL,
+ hfp.HfFeature.ENHANCED_CALL_STATUS,
+ hfp.HfFeature.ENHANCED_CALL_CONTROL,
+ hfp.HfFeature.CODEC_NEGOTIATION,
+ hfp.HfFeature.HF_INDICATORS,
+ hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
+ ],
+ supported_hf_indicators=[
+ hfp.HfIndicator.BATTERY_LEVEL,
+ ],
+ supported_audio_codecs=[
+ hfp.AudioCodec.CVSD,
+ hfp.AudioCodec.MSBC,
+ ],
+ )
+
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device.classic_enabled = True
@@ -151,11 +104,13 @@ async def main():
rfcomm_server = RfcommServer(device)
# Listen for incoming DLC connections
- channel_number = rfcomm_server.listen(on_dlc)
+ channel_number = rfcomm_server.listen(lambda dlc: on_dlc(dlc, configuration))
print(f'### Listening for connection on channel {channel_number}')
# Advertise the HFP RFComm channel in the SDP
- device.sdp_service_records = make_sdp_records(channel_number)
+ device.sdp_service_records = {
+ 0x00010001: hfp.sdp_records(0x00010001, channel_number, configuration)
+ }
# Let's go!
await device.power_on()