diff --git a/tests/bluetooth/classic/l2cap/CMakeLists.txt b/tests/bluetooth/classic/l2cap/CMakeLists.txt new file mode 100644 index 000000000000..89de0b86386a --- /dev/null +++ b/tests/bluetooth/classic/l2cap/CMakeLists.txt @@ -0,0 +1,10 @@ +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +set(NO_QEMU_SERIAL_BT_SERVER 1) + +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(bluetooth) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) diff --git a/tests/bluetooth/classic/l2cap/README.rst b/tests/bluetooth/classic/l2cap/README.rst new file mode 100644 index 000000000000..6dade18ffaf2 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/README.rst @@ -0,0 +1,74 @@ +.. _bluetooth_classic_l2cap_server_tests: + +Bluetooth Classic L2cap Server Tests +################################## + +Overview +******** + +This test suite uses ``bumble`` for testing Bluetooth Classic communication between a host +PC (running :ref:`Twister `) and a device under test (DUT) running Zephyr. + +Prerequisites +************* + +The test suite has the following prerequisites: + +* The ``bumble`` library installed on the host PC. +The Bluetooth Classic controller on PC side is required. Refer to getting started of `bumble`_ +for details. + +The HCI transport for ``bumble`` can be configured as follows: + +* A specific configuration context can be provided along with the ``usb_hci`` fixture separated by + a ``:`` (i.e. specify fixture ``usb_hci:usb:0`` to use the ``usb:0`` as hci transport for + ``bumble``). +* The configuration context can be overridden using the `hci transport`_ can be provided using the + ``--hci-transport`` test suite argument (i.e. run ``twister`` with the + ``--pytest-args=--hci-transport=usb:0`` argument to use the ``usb:0`` as hci transport for + ``bumble``). + +Building and Running +******************** + +Running on mimxrt1170_evk@B/mimxrt1176/cm7 +========================================== + +Running the test suite on :ref:`mimxrt1170_evk` relies on configuration of ``bumble``. + +On the host PC, a HCI transport needs to be required. Refer to `bumble platforms`_ page of +``bumble`` for details. + +For example, on windows, a PTS dongle is used. After `WinUSB driver`_ has been installed, +the HCI transport would be USB transport interface ``usb:``. + +If the HCI transport is ``usb:0`` and debug console port is ``COM4``, the test suite can be +launched using Twister: + +.. code-block:: shell + + west twister -v -p mimxrt1170_evk@B/mimxrt1176/cm7 --device-testing --device-serial COM4 -T tests/bluetooth/classic/l2cap -O l2cap_s --force-platform --west-flash --west-runner=jlink -X usb_hci:usb:0 + +Running on Hardware +=================== + +Running the test suite on hardware requires a HCI transport connected to the host PC. + +The test suite can be launched using Twister. Below is an example for running on the +:zephyr:board:`mimxrt1170_evk@B/mimxrt1176/cm7`: + +.. code-block:: shell + + west twister -v -p mimxrt1170_evk@B/mimxrt1176/cm7 --device-testing --device-serial COM4 -T tests/bluetooth/classic/l2cap -O l2cap_s --force-platform --west-flash --west-runner=jlink -X usb_hci:usb:0 + +.. _bumble: + https://google.github.io/bumble/getting_started.html + +.. _hci transport: + https://google.github.io/bumble/transports/index.html + +.. _bumble platforms: + https://google.github.io/bumble/platforms/index.html + +.. _WinUSB driver: + https://google.github.io/bumble/platforms/windows.html diff --git a/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.conf b/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.conf new file mode 100644 index 000000000000..289b5a597386 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.conf @@ -0,0 +1,10 @@ +#select NXP NW612 Chipset +CONFIG_BT_NXP_NW612=y + +CONFIG_BT_SETTINGS=n +CONFIG_FLASH=n +CONFIG_FLASH_MAP=n +CONFIG_NVS=n +CONFIG_SETTINGS=n + +CONFIG_ENTROPY_GENERATOR=y diff --git a/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.overlay b/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.overlay new file mode 100644 index 000000000000..96ef63ad60b2 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.overlay @@ -0,0 +1,11 @@ +/* + * Copyright 2024 NXP + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/ { + chosen { + zephyr,sram = &dtcm; + }; +}; diff --git a/tests/bluetooth/classic/l2cap/prj.conf b/tests/bluetooth/classic/l2cap/prj.conf new file mode 100644 index 000000000000..2230d1bcbe77 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/prj.conf @@ -0,0 +1,17 @@ +CONFIG_BT=y +CONFIG_BT_CLASSIC=y +CONFIG_BT_SHELL=y +CONFIG_LOG=y +CONFIG_DEBUG=y +CONFIG_ZTEST=y +CONFIG_BT_L2CAP_DYNAMIC_CHANNEL=y +CONFIG_BT_L2CAP_RET=y +CONFIG_BT_L2CAP_FC=y +CONFIG_BT_L2CAP_ENH_RET=y +CONFIG_BT_L2CAP_STREAM=y +CONFIG_BT_L2CAP_FCS=y +CONFIG_BT_L2CAP_EXT_WIN_SIZE=y +CONFIG_BT_L2CAP_MAX_WINDOW_SIZE=5 +CONFIG_BT_DEVICE_NAME="L2CAP_BR" +CONFIG_BT_CREATE_CONN_TIMEOUT=30 +CONFIG_BT_PAGE_TIMEOUT=0xFFFF diff --git a/tests/bluetooth/classic/l2cap/pytest/conftest.py b/tests/bluetooth/classic/l2cap/pytest/conftest.py new file mode 100644 index 000000000000..f9ad92a35435 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/pytest/conftest.py @@ -0,0 +1,57 @@ +# Copyright 2024 NXP +# +# SPDX-License-Identifier: Apache-2.0 +import re + +import pytest +from test_l2cap_common import L2CAP_SERVER_PSM, MODE, logger +from twister_harness import DeviceAdapter, Shell + + +def pytest_addoption(parser) -> None: + """Add local parser options to pytest.""" + parser.addoption('--hci-transport', default=None, help='Configuration HCI transport for bumble') + + +@pytest.fixture(name='initialize', scope='session') +def fixture_initialize(request, shell: Shell, dut: DeviceAdapter): + """Session initializtion""" + # Get HCI transport for bumble + hci = request.config.getoption('--hci-transport') + + if hci is None: + for fixture in dut.device_config.fixtures: + if fixture.startswith('usb_hci:'): + hci = fixture.split(sep=':', maxsplit=1)[1] + break + + assert hci is not None + + lines = shell.exec_command("bt init") + lines = dut.readlines_until("Bluetooth initialized") + regex = r"Identity: (?P(.*\s\(.*\)))" + bd_addr = None + for line in lines: + logger.info(f"Shell log {line}") + m = re.search(regex, line) + if m: + bd_addr = m.group('bd_addr') + + if bd_addr is None: + logger.error('Fail to get IUT BD address') + raise AssertionError + + lines = shell.exec_command("br pscan on") + lines = shell.exec_command("br iscan on") + logger.info('initialized') + + lines = shell.exec_command(f"l2cap_br register {format(L2CAP_SERVER_PSM, 'x')} {MODE}") + logger.info("l2cap server register") + return hci, bd_addr + + +@pytest.fixture +def l2cap_br_dut(initialize): + logger.info('Start running testcase') + yield initialize + logger.info('Done') diff --git a/tests/bluetooth/classic/l2cap/pytest/test_l2cap_common.py b/tests/bluetooth/classic/l2cap/pytest/test_l2cap_common.py new file mode 100644 index 000000000000..c4d606e5935e --- /dev/null +++ b/tests/bluetooth/classic/l2cap/pytest/test_l2cap_common.py @@ -0,0 +1,115 @@ +# Copyright 2025 NXP +# +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +import logging + +from bumble.core import BT_BR_EDR_TRANSPORT +from bumble.hci import ( + HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, +) +from bumble.pairing import PairingDelegate + +logger = logging.getLogger(__name__) + +L2CAP_SERVER_PSM = 0x1001 +MODE = "basic" +L2CAP_CHAN_IUT_ID = 0 +MAX_MTU = 0xFFFF +MIN_MTU = 0x30 +STRESS_TEST_MAX_COUNT = 50 + + +class Delegate(PairingDelegate): + def __init__( + self, + dut, + io_capability, + ): + super().__init__( + io_capability, + ) + self.dut = dut + + +async def device_power_on(device) -> None: + while True: + try: + await device.power_on() + break + except Exception: + continue + + +async def wait_for_shell_response(dut, message): + found = False + lines = [] + try: + while found is False: + read_lines = dut.readlines() + for line in read_lines: + if message in line: + found = True + break + lines = lines + read_lines + await asyncio.sleep(0.1) + except Exception as e: + logger.error(f'{e}!', exc_info=True) + raise e + return found, lines + + +async def send_cmd_to_iut(shell, dut, cmd, parse=None, wait=True): + found = False + lines = shell.exec_command(cmd) + if wait: + if parse is not None: + found, lines = await wait_for_shell_response(dut, parse) + else: + found = True + else: + found = False + if parse is not None: + for line in lines: + if parse in line: + found = True + break + else: + found = True + logger.info(f'{lines}') + assert found is True + return lines + + +async def bumble_acl_connect(shell, dut, device, target_address): + connection = None + try: + connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) + logger.info(f'=== Connected to {connection.peer_address}!') + except Exception as e: + logger.error(f'Fail to connect to {target_address}!') + raise e + return connection + + +async def bumble_acl_disconnect(shell, dut, device, connection): + await device.disconnect( + connection, reason=HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR + ) + found, lines = await wait_for_shell_response(dut, "Disconnected:") + logger.info(f'lines : {lines}') + assert found is True + return found, lines + + +async def bumble_l2cap_disconnect(shell, dut, incoming_channel, iut_id=L2CAP_CHAN_IUT_ID): + try: + await incoming_channel.disconnect() + except Exception as e: + logger.error('Fail to send l2cap disconnect command!') + raise e + assert incoming_channel.disconnection_result is None + + found, _ = await wait_for_shell_response(dut, f"Channel {iut_id} disconnected") + assert found is True diff --git a/tests/bluetooth/classic/l2cap/pytest/test_l2cap_server.py b/tests/bluetooth/classic/l2cap/pytest/test_l2cap_server.py new file mode 100644 index 000000000000..9e3181cf2680 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/pytest/test_l2cap_server.py @@ -0,0 +1,893 @@ +# Copyright 2025 NXP +# +# SPDX-License-Identifier: Apache-2.0 +import asyncio +import sys + +from bumble.core import ProtocolError +from bumble.device import Device +from bumble.hci import ( + Address, + HCI_Write_Page_Timeout_Command, +) +from bumble.l2cap import ( + ClassicChannelSpec, +) +from bumble.pairing import PairingConfig, PairingDelegate +from bumble.snoop import BtSnooper +from bumble.transport import open_transport_or_link +from test_l2cap_common import ( + L2CAP_CHAN_IUT_ID, + L2CAP_SERVER_PSM, + MODE, + STRESS_TEST_MAX_COUNT, + Delegate, + bumble_acl_connect, + bumble_acl_disconnect, + device_power_on, + logger, + send_cmd_to_iut, + wait_for_shell_response, +) +from twister_harness import DeviceAdapter, Shell + + +async def l2cap_server_case_1(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_1 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br disconnect {L2CAP_CHAN_IUT_ID}", + f"Channel {L2CAP_CHAN_IUT_ID} disconnected", + ) + + await bumble_acl_disconnect(shell, dut, device, connection) + + +async def l2cap_server_case_2(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_2 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + + await device.authenticate(connection) + await device.encrypt(connection) + + l2cap_channel = await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + await l2cap_channel.disconnect() + found, _ = await wait_for_shell_response( + dut, f"Channel {L2CAP_CHAN_IUT_ID} disconnected" + ) + assert found is True + await bumble_acl_disconnect(shell, dut, device, connection) + + +async def l2cap_server_case_3(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_3 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + lines = await send_cmd_to_iut(shell, dut, "bt disconnect", "Disconnected:") + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + logger.info(f'l2cap disconnect {L2CAP_CHAN_IUT_ID} successfully') + found = True + break + assert found is True + + +async def l2cap_server_case_4(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_4 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + found, lines = await bumble_acl_disconnect(shell, dut, device, connection) + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + found = True + break + assert found is True + + +async def l2cap_server_case_5(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_5 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br disconnect {L2CAP_CHAN_IUT_ID}", + f"Channel {L2CAP_CHAN_IUT_ID} disconnected", + ) + + await bumble_acl_disconnect(shell, dut, device, connection) + + +async def l2cap_server_case_6(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_6 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + l2cap_channel = await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + await l2cap_channel.disconnect() + found, _ = await wait_for_shell_response( + dut, f"Channel {L2CAP_CHAN_IUT_ID} disconnected" + ) + assert found is True + + await bumble_acl_disconnect(shell, dut, device, connection) + + +async def l2cap_server_case_7(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_7 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + lines = await send_cmd_to_iut(shell, dut, "bt disconnect", "Disconnected:") + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + logger.info(f'l2cap disconnect {L2CAP_CHAN_IUT_ID} successfully') + found = True + break + assert found is True + + +async def l2cap_server_case_8(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_8 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + found, lines = await bumble_acl_disconnect(shell, dut, device, connection) + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + found = True + break + assert found is True + + +async def l2cap_server_case_9(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_9 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM, mtu=65535) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + found, lines = await bumble_acl_disconnect(shell, dut, device, connection) + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + found = True + break + assert found is True + + +async def l2cap_server_case_10(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_10 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM, mtu=48) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + found, lines = await bumble_acl_disconnect(shell, dut, device, connection) + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + found = True + break + assert found is True + + +async def l2cap_server_case_11(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_11 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + try: + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=0x1003)) + except ProtocolError as error: + logger.info(f'error_code = {error.error_code}') + assert ( + error.error_code == 0x2 + and error.error_name == 'CONNECTION_REFUSED_PSM_NOT_SUPPORTED' + ) + + +async def l2cap_server_case_12(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_12 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + client0_received = [] + client1_received = [] + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + + assert connection is not None + + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br register 1003 {MODE}", + f"L2CAP psm {str(0x1003)} registered", + wait=False, + ) + + await device.authenticate(connection) + await device.encrypt(connection) + + l2cap_channel0 = await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + + def on_client0_data(data): + client0_received.append(data) + + l2cap_channel0.sink = on_client0_data + l2cap_channel1 = await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=0x1003) + ) + found, _ = await wait_for_shell_response(dut, "Channel 1 connected") + assert found is True + + def on_client1_data(data): + client1_received.append(data) + + l2cap_channel1.sink = on_client1_data + + data = "this is server case 13,bumble send data to iut" + data_ba = bytearray() + data_ba.extend(data.encode('utf-8')) + data_ba.append(0) + logger.info(f"test l2cap server recv data {data}") + l2cap_channel0.send_pdu(data_ba) + l2cap_channel1.send_pdu(data_ba) + found, lines = await wait_for_shell_response(dut, "Incoming data channel") + count = 0 + for line in lines: + if data in line: + count = count + 1 + assert count == 2 + + data = "this_is_server_case_13_iut_send_data_to_bumble" + logger.info(f"test l2cap send data {data}") + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br send {L2CAP_CHAN_IUT_ID} {data} {str(hex(len(data)))[2:]}", + None, + wait=False, + ) + await asyncio.sleep(0.5) + logger.info(f"l2cap_channel0 recv data = {client0_received}") + await send_cmd_to_iut( + shell, dut, f"l2cap_br send 1 {data} {str(hex(len(data)))[2:]}", None, wait=False + ) + await asyncio.sleep(0.5) + logger.info(f"l2cap_channel1 recv data = {client1_received}") + data_ba.clear() + data_ba.extend(data.encode('utf-8')) + recv_ba = bytearray() + for data in client0_received: + recv_ba.extend(data) + assert data_ba == recv_ba + recv_ba.clear() + for data in client1_received: + recv_ba.extend(data) + assert data_ba == recv_ba + + found, lines = await bumble_acl_disconnect(shell, dut, device, connection) + found0 = False + found1 = False + for line in lines: + if line.find("Channel 0 disconnected") != -1: + found0 = True + elif line.find("Channel 1 disconnected") != -1: + found1 = True + assert found0 is True and found1 is True + + +async def l2cap_server_case_13(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_13 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + + await device.authenticate(connection) + await device.encrypt(connection) + + for _ in range(STRESS_TEST_MAX_COUNT): + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response( + dut, f"Channel {L2CAP_CHAN_IUT_ID} connected" + ) + assert found is True + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br disconnect {L2CAP_CHAN_IUT_ID}", + f"Channel {L2CAP_CHAN_IUT_ID} disconnected", + ) + + await bumble_acl_disconnect(shell, dut, device, connection) + + +async def l2cap_server_case_14(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_14 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + data_ba = bytearray() + data_recv = "this is server case 13,bumble send data to iut" + data_send = "this_is_server_case_13_iut_send_data_to_bumble" + + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + client_received = [] + + def on_client_data(data): + client_received.append(data) + + await device.authenticate(connection) + await device.encrypt(connection) + + l2cap_channel = await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + l2cap_channel.sink = on_client_data + for _ in range(STRESS_TEST_MAX_COUNT): + logger.info(f"test l2cap server recv data {data_recv}") + data_ba.clear() + data_ba.extend(data_recv.encode('utf-8')) + data_ba.append(0) + l2cap_channel.send_pdu(data_ba) + found, _ = await wait_for_shell_response(dut, data_recv) + assert found is True + + logger.info(f"test l2cap server send data {data_send}") + client_received.clear() + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br send {L2CAP_CHAN_IUT_ID} {data_send} {str(hex(len(data_send)))[2:]}", + None, + wait=False, + ) + await asyncio.sleep(0.5) + data_ba.clear() + data_ba.extend(data_send.encode('utf-8')) + recv_ba = bytearray() + for data in client_received: + recv_ba.extend(data) + assert data_ba == recv_ba + + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br disconnect {L2CAP_CHAN_IUT_ID}", + f"Channel {L2CAP_CHAN_IUT_ID} disconnected", + ) + + await bumble_acl_disconnect(shell, dut, device, connection) + + +class TestL2capBRServer: + def test_l2cap_server_case_1(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test passive ACL connection,l2cap connect active + authenticate and active l2cap disconnect""" + logger.info(f'test_l2cap_server_case_1 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_1(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_2(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test passive ACL connection,l2cap connect + active authenticate and passive l2cap disconnect""" + logger.info(f'test_l2cap_server_case_2 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_2(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_3(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test passive ACL connection,l2cap connect active + authenticate and active acl disconnect.l2cap disconnect should be successfully.""" + logger.info(f'test_l2cap_server_case_3 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_3(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_4(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test passive ACL connection,l2cap connect active + authenticate and passive acl disconnect.l2cap disconnect should be successfully.""" + logger.info(f'test_l2cap_server_case_4 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_4(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_5(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test active ACL connection,l2cap connect active + authenticate and active l2cap disconnect""" + logger.info(f'test_l2cap_server_case_5 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_5(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_6(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test active ACL connection,l2cap connect active + authenticate and passive l2cap disconnect""" + logger.info(f'test_l2cap_server_case_6 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_6(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_7(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test active ACL connection,l2cap connect active + authenticate and active acl disconnect.l2cap disconnect should be successfully.""" + logger.info(f'test_l2cap_server_case_7 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_7(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_8(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test active ACL connection,l2cap connect active + authenticate and passive acl disconnect.l2cap disconnect should be successfully.""" + logger.info(f'test_l2cap_server_case_8 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_8(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_9(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test l2cap connection with max MTU(0xffff).But + the max mtu which the stack supports is (CONFIG_BT_BUF_ACL_RX_SIZE - 4U = 196).""" + logger.info(f'test_l2cap_server_case_9 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_9(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_10(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test l2cap connection with min MTU(0x30).""" + logger.info(f'test_l2cap_server_case_10 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_10(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_11(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test l2cap connection with invalid PSM.""" + logger.info(f'test_l2cap_server_case_11 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_11(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_12(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test l2cap multi_channel connection and data transfer. + Two PSM(0x1001, 0x1003) are registered in iut which is server""" + logger.info(f'test_l2cap_server_case_12 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_12(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_13(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Stress Test. Repeat l2cap connect, disconnect operation""" + logger.info(f'test_l2cap_server_case_13 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_13(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_14(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Stress Test. duplicate data transfer operation""" + logger.info(f'test_l2cap_server_case_14 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_14(hci, shell, dut, iut_address)) diff --git a/tests/bluetooth/classic/l2cap/src/l2cap_test.c b/tests/bluetooth/classic/l2cap/src/l2cap_test.c new file mode 100644 index 000000000000..c184f5bc2980 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/src/l2cap_test.c @@ -0,0 +1,543 @@ +/* sdp_client.c - Bluetooth classic SDP client smoke test */ + +/* + * Copyright 2024 NXP + * + * SPDX-License-Identifier: Apache-2.0 + */ +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include "host/shell/bt.h" +#include "common/bt_shell_private.h" + +#define DATA_BREDR_MTU 48 + +NET_BUF_POOL_FIXED_DEFINE(data_tx_pool, 1, BT_L2CAP_SDU_BUF_SIZE(DATA_BREDR_MTU), + CONFIG_BT_CONN_TX_USER_DATA_SIZE, NULL); +NET_BUF_POOL_FIXED_DEFINE(data_rx_pool, 1, DATA_BREDR_MTU, 8, NULL); + +struct l2cap_br_chan { + struct bt_l2cap_br_chan chan; +#if defined(CONFIG_BT_L2CAP_RET_FC) + struct k_fifo l2cap_recv_fifo; + bool hold_credit; +#endif /* CONFIG_BT_L2CAP_RET_FC */ +}; + +struct app_l2cap_br_chan { + bool active; + uint8_t id; + struct bt_conn *conn; + struct l2cap_br_chan l2cap_chan; +}; + +struct bt_l2cap_br_server { + struct bt_l2cap_server server; +#if defined(CONFIG_BT_L2CAP_RET_FC) + uint8_t options; +#endif /* CONFIG_BT_L2CAP_RET_FC */ +}; + +#define BT_L2CAP_BR_SERVER_OPT_RET BIT(0) +#define BT_L2CAP_BR_SERVER_OPT_FC BIT(1) +#define BT_L2CAP_BR_SERVER_OPT_ERET BIT(2) +#define BT_L2CAP_BR_SERVER_OPT_STREAM BIT(3) +#define BT_L2CAP_BR_SERVER_OPT_MODE_OPTIONAL BIT(4) +#define BT_L2CAP_BR_SERVER_OPT_EXT_WIN_SIZE BIT(5) +#define BT_L2CAP_BR_SERVER_OPT_HOLD_CREDIT BIT(6) + +struct app_l2cap_br_server { + bool active; + uint8_t id; + struct bt_conn *conn; + struct bt_l2cap_br_server l2cap_server; +}; + +#define APPL_L2CAP_CONNECTION_MAX_COUNT 2 +struct app_l2cap_br_chan br_l2cap[APPL_L2CAP_CONNECTION_MAX_COUNT] = {0}; +struct app_l2cap_br_server br_l2cap_server[APPL_L2CAP_CONNECTION_MAX_COUNT] = {0}; + +static int l2cap_recv(struct bt_l2cap_chan *chan, struct net_buf *buf) +{ + struct l2cap_br_chan *br_chan = CONTAINER_OF(chan, struct l2cap_br_chan, chan.chan); + struct app_l2cap_br_chan *appl_br_chan = + CONTAINER_OF(br_chan, struct app_l2cap_br_chan, l2cap_chan); + + bt_shell_print("Incoming data channel %d len %u", appl_br_chan->id, buf->len); + + if (buf->len) { + printk("Incoming data :%.*s\r\n", buf->len, buf->data); + } + +#if defined(CONFIG_BT_L2CAP_RET_FC) + if (br_chan->hold_credit) { + k_fifo_put(&br_chan->l2cap_recv_fifo, buf); + return -EINPROGRESS; + } +#endif /* CONFIG_BT_L2CAP_RET_FC */ + (void)br_chan; + + return 0; +} + +static struct net_buf *l2cap_alloc_buf(struct bt_l2cap_chan *chan) +{ + bt_shell_print("Channel %p requires buffer", chan); + + return net_buf_alloc(&data_rx_pool, K_NO_WAIT); +} + +static void l2cap_connected(struct bt_l2cap_chan *chan) +{ + struct l2cap_br_chan *br_chan = CONTAINER_OF(chan, struct l2cap_br_chan, chan.chan); + struct app_l2cap_br_chan *appl_br_chan = + CONTAINER_OF(br_chan, struct app_l2cap_br_chan, l2cap_chan); + bt_shell_print("Channel %d connected", appl_br_chan->id); + +#if defined(CONFIG_BT_L2CAP_RET_FC) + switch (br_chan->chan.rx.mode) { + case BT_L2CAP_BR_LINK_MODE_BASIC: + bt_shell_print("It is basic mode"); + if (br_chan->hold_credit) { + br_chan->hold_credit = false; + bt_shell_warn("hold_credit is unsupported in basic mode"); + } + break; + case BT_L2CAP_BR_LINK_MODE_RET: + bt_shell_print("It is retransmission mode"); + break; + case BT_L2CAP_BR_LINK_MODE_FC: + bt_shell_print("It is flow control mode"); + break; + case BT_L2CAP_BR_LINK_MODE_ERET: + bt_shell_print("It is enhance retransmission mode"); + break; + case BT_L2CAP_BR_LINK_MODE_STREAM: + bt_shell_print("It is streaming mode"); + break; + default: + bt_shell_error("It is unknown mode"); + break; + } +#endif /* CONFIG_BT_L2CAP_RET_FC */ + (void)br_chan; +} + +static void l2cap_disconnected(struct bt_l2cap_chan *chan) +{ + struct l2cap_br_chan *br_chan = CONTAINER_OF(chan, struct l2cap_br_chan, chan.chan); + struct app_l2cap_br_chan *appl_br_chan = + CONTAINER_OF(br_chan, struct app_l2cap_br_chan, l2cap_chan); + appl_br_chan->conn = NULL; + appl_br_chan->active = false; + + bt_shell_print("Channel %d disconnected", appl_br_chan->id); + +#if defined(CONFIG_BT_L2CAP_RET_FC) + struct net_buf *buf; + + do { + buf = k_fifo_get(&br_chan->l2cap_recv_fifo, K_NO_WAIT); + if (buf != NULL) { + net_buf_unref(buf); + } + } while (buf != NULL); +#endif /* CONFIG_BT_L2CAP_RET_FC */ +} + +static const struct bt_l2cap_chan_ops l2cap_ops = { + .alloc_buf = l2cap_alloc_buf, + .recv = l2cap_recv, + .connected = l2cap_connected, + .disconnected = l2cap_disconnected, +}; + +struct app_l2cap_br_chan *appl_br_l2cap(struct bt_conn *conn) +{ + for (uint8_t index = 0; index < APPL_L2CAP_CONNECTION_MAX_COUNT; index++) { + if (br_l2cap[index].conn == NULL && br_l2cap[index].active == false) { + br_l2cap[index].conn = conn; + br_l2cap[index].active = true; + br_l2cap[index].id = index; + br_l2cap[index].l2cap_chan.chan.chan.ops = &l2cap_ops; + br_l2cap[index].l2cap_chan.chan.rx.mtu = DATA_BREDR_MTU; +#if defined(CONFIG_BT_L2CAP_RET_FC) + k_fifo_init(&br_l2cap[index].l2cap_chan.l2cap_recv_fifo); +#endif + return &br_l2cap[index]; + } + } + return NULL; +} + +static int l2cap_accept(struct bt_conn *conn, struct bt_l2cap_server *server, + struct bt_l2cap_chan **chan) +{ + struct bt_l2cap_br_server *br_server; + struct app_l2cap_br_server *appl_l2cap_server; + struct app_l2cap_br_chan *appl_l2cap = NULL; + struct l2cap_br_chan *l2cap_chan = NULL; + + br_server = CONTAINER_OF(server, struct bt_l2cap_br_server, server); + appl_l2cap_server = CONTAINER_OF(br_server, struct app_l2cap_br_server, l2cap_server); + + appl_l2cap = appl_br_l2cap(conn); + if (!appl_l2cap) { + bt_shell_error("No channels application br chan"); + return -ENOMEM; + } + l2cap_chan = &appl_l2cap->l2cap_chan; + *chan = &l2cap_chan->chan.chan; + + bt_shell_print("Incoming BR/EDR conn %p", conn); + +#if defined(CONFIG_BT_L2CAP_RET_FC) + if (br_server->options & BT_L2CAP_BR_SERVER_OPT_HOLD_CREDIT) { + l2cap_chan->hold_credit = true; + } else { + l2cap_chan->hold_credit = false; + } + + if (br_server->options & BT_L2CAP_BR_SERVER_OPT_EXT_WIN_SIZE) { + l2cap_chan->chan.rx.extended_control = true; + } else { + l2cap_chan->chan.rx.extended_control = false; + } + + if (br_server->options & BT_L2CAP_BR_SERVER_OPT_MODE_OPTIONAL) { + l2cap_chan->chan.rx.optional = true; + } else { + l2cap_chan->chan.rx.optional = false; + } + + l2cap_chan->chan.rx.fcs = BT_L2CAP_BR_FCS_16BIT; + + if (br_server->options & BT_L2CAP_BR_SERVER_OPT_STREAM) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_STREAM; + l2cap_chan->chan.rx.max_window = CONFIG_BT_L2CAP_MAX_WINDOW_SIZE; + l2cap_chan->chan.rx.max_transmit = 0; + } else if (br_server->options & BT_L2CAP_BR_SERVER_OPT_ERET) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_ERET; + l2cap_chan->chan.rx.max_window = CONFIG_BT_L2CAP_MAX_WINDOW_SIZE; + l2cap_chan->chan.rx.max_transmit = 3; + } else if (br_server->options & BT_L2CAP_BR_SERVER_OPT_FC) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_FC; + l2cap_chan->chan.rx.max_window = CONFIG_BT_L2CAP_MAX_WINDOW_SIZE; + l2cap_chan->chan.rx.max_transmit = 3; + } else if (br_server->options & BT_L2CAP_BR_SERVER_OPT_RET) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_RET; + l2cap_chan->chan.rx.max_window = CONFIG_BT_L2CAP_MAX_WINDOW_SIZE; + l2cap_chan->chan.rx.max_transmit = 3; + } +#endif /* CONFIG_BT_L2CAP_RET_FC */ + (void)br_server; + return 0; +} + +struct app_l2cap_br_server *appl_br_l2cap_server_alloc(uint16_t psm) +{ + for (uint8_t index = 0; index < APPL_L2CAP_CONNECTION_MAX_COUNT; index++) { + if (br_l2cap_server[index].conn == NULL && br_l2cap_server[index].active == false) { + br_l2cap_server[index].active = true; + br_l2cap_server[index].id = index; + br_l2cap_server[index].l2cap_server.server.psm = psm; + br_l2cap_server[index].l2cap_server.server.accept = l2cap_accept; + return &br_l2cap_server[index]; + } + } + return NULL; +} + +static int cmd_connect(const struct shell *sh, size_t argc, char *argv[]) +{ + uint16_t psm; + struct bt_conn_info info; + int err; + struct app_l2cap_br_chan *appl_l2cap = NULL; + struct l2cap_br_chan *l2cap_chan = NULL; + + if (!default_conn) { + shell_error(sh, "Not connected"); + return -ENOEXEC; + } + + appl_l2cap = appl_br_l2cap(default_conn); + if (!appl_l2cap) { + bt_shell_error("No channels application br chan"); + return -ENOMEM; + } + l2cap_chan = &appl_l2cap->l2cap_chan; + if (l2cap_chan->chan.chan.conn) { + bt_shell_error("No channels available"); + return -ENOMEM; + } + + err = bt_conn_get_info(default_conn, &info); + if ((err < 0) || (info.type != BT_CONN_TYPE_BR)) { + shell_error(sh, "Invalid conn type"); + return -ENOEXEC; + } + + psm = strtoul(argv[1], NULL, 16); + +#if defined(CONFIG_BT_L2CAP_RET_FC) + if (!strcmp(argv[2], "basic")) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_BASIC; + } else if (!strcmp(argv[2], "ret")) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_RET; + l2cap_chan->chan.rx.max_transmit = 3; + } else if (!strcmp(argv[2], "fc")) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_FC; + l2cap_chan->chan.rx.max_transmit = 3; + } else if (!strcmp(argv[2], "eret")) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_ERET; + l2cap_chan->chan.rx.max_transmit = 3; + } else if (!strcmp(argv[2], "stream")) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_STREAM; + l2cap_chan->chan.rx.max_transmit = 0; + } else { + shell_help(sh); + return SHELL_CMD_HELP_PRINTED; + } + + l2cap_chan->hold_credit = false; + l2cap_chan->chan.rx.optional = false; + l2cap_chan->chan.rx.extended_control = false; + + for (size_t index = 3; index < argc; index++) { + if (!strcmp(argv[index], "hold_credit")) { + l2cap_chan->hold_credit = true; + } else if (!strcmp(argv[index], "mode_optional")) { + l2cap_chan->chan.rx.optional = true; + } else if (!strcmp(argv[index], "extended_control")) { + l2cap_chan->chan.rx.extended_control = true; + } else if (!strcmp(argv[index], "sec")) { + l2cap_chan->chan.required_sec_level = strtoul(argv[++index], NULL, 16); + } else if (!strcmp(argv[index], "mtu")) { + l2cap_chan->chan.rx.mtu = strtoul(argv[++index], NULL, 16); + } else { + shell_help(sh); + return SHELL_CMD_HELP_PRINTED; + } + } + + if ((l2cap_chan->chan.rx.extended_control) && + ((l2cap_chan->chan.rx.mode != BT_L2CAP_BR_LINK_MODE_ERET) && + (l2cap_chan->chan.rx.mode != BT_L2CAP_BR_LINK_MODE_STREAM))) { + shell_error(sh, "[extended_control] only supports mode eret and stream"); + return -ENOEXEC; + } + + if (l2cap_chan->hold_credit && (l2cap_chan->chan.rx.mode == BT_L2CAP_BR_LINK_MODE_BASIC)) { + shell_error(sh, "[hold_credit] cannot support basic mode"); + return -ENOEXEC; + } + + l2cap_chan->chan.rx.max_window = CONFIG_BT_L2CAP_MAX_WINDOW_SIZE; +#endif /* CONFIG_BT_L2CAP_RET_FC */ + + err = bt_l2cap_chan_connect(default_conn, &l2cap_chan->chan.chan, psm); + if (err < 0) { + shell_error(sh, "Unable to connect to psm %u (err %d)", psm, err); + appl_l2cap->conn = NULL; + appl_l2cap->active = false; + } else { + shell_print(sh, "L2CAP connection pending"); + } + + return err; +} + +static int cmd_l2cap_disconnect(const struct shell *sh, size_t argc, char *argv[]) +{ + int err; + uint8_t id; + + id = strtoul(argv[1], NULL, 16); + + if (br_l2cap[id].active == true) { + err = bt_l2cap_chan_disconnect(&br_l2cap[id].l2cap_chan.chan.chan); + if (err) { + shell_error(sh, "Unable to disconnect: %u", -err); + return err; + } + } + + return 0; +} + +static int cmd_l2cap_send(const struct shell *sh, size_t argc, char *argv[]) +{ + int err, mtu_len = DATA_BREDR_MTU, data_len = 0; + uint8_t id; + struct net_buf *buf; + uint16_t len; + uint16_t send_length = 0, remaining_data = 0; + struct l2cap_br_chan *l2cap_chan = NULL; + + id = strtoul(argv[1], NULL, 16); + data_len = strtoul(argv[3], NULL, 16); + mtu_len = MIN(l2cap_chan->chan.tx.mtu, DATA_BREDR_MTU); + + shell_print(sh, "data_len = %d", data_len); + + if (br_l2cap[id].active == true) { + l2cap_chan = &br_l2cap[id].l2cap_chan; + remaining_data = data_len; + while (remaining_data > 0) { + buf = net_buf_alloc(&data_tx_pool, K_SECONDS(2)); + if (!buf) { + if (l2cap_chan->chan.state != BT_L2CAP_CONNECTED) { + shell_error(sh, "Channel disconnected, stopping TX"); + return -EAGAIN; + } + shell_error(sh, "Allocation timeout, stopping TX"); + return -EAGAIN; + } + net_buf_reserve(buf, BT_L2CAP_CHAN_SEND_RESERVE); + if (remaining_data > mtu_len) { + net_buf_add_mem(buf, argv[2] + send_length, mtu_len); + len = mtu_len; + } else { + net_buf_add_mem(buf, argv[2] + send_length, remaining_data); + len = remaining_data; + } + err = bt_l2cap_chan_send(&l2cap_chan->chan.chan, buf); + if (err < 0) { + shell_error(sh, "Unable to send: %d", -err); + net_buf_unref(buf); + return -ENOEXEC; + } + send_length += len; + remaining_data -= len; + } + } else { + shell_print(sh, "channel %d not support", id); + return -EINVAL; + } + return 0; +} + +bool l2cap_psm_registered(uint16_t psm) +{ + for (uint8_t index = 0; index < APPL_L2CAP_CONNECTION_MAX_COUNT; index++) { + if (br_l2cap_server[index].active == true && + br_l2cap_server[index].l2cap_server.server.psm == psm) { + return true; + } + } + return false; +} +static int cmd_l2cap_register(const struct shell *sh, size_t argc, char *argv[]) +{ + uint16_t psm = strtoul(argv[1], NULL, 16); + struct app_l2cap_br_server *app_l2cap_server; + struct bt_l2cap_br_server *l2cap_server; + + if (l2cap_psm_registered(psm)) { + shell_print(sh, "Already registered"); + return -ENOEXEC; + } + + app_l2cap_server = appl_br_l2cap_server_alloc(psm); + if (!app_l2cap_server) { + bt_shell_error("No channels application br chan"); + return -ENOMEM; + } + l2cap_server = &app_l2cap_server->l2cap_server; + l2cap_server->server.psm = strtoul(argv[1], NULL, 16); + +#if defined(CONFIG_BT_L2CAP_RET_FC) + l2cap_server->options = 0; + + if (!strcmp(argv[2], "basic")) { + /* Support mode: None */ + } else if (!strcmp(argv[2], "ret")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_RET; + } else if (!strcmp(argv[2], "fc")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_FC; + } else if (!strcmp(argv[2], "eret")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_ERET; + } else if (!strcmp(argv[2], "stream")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_STREAM; + } else { + l2cap_server->server.psm = 0; + shell_help(sh); + return SHELL_CMD_HELP_PRINTED; + } + + for (size_t index = 3; index < argc; index++) { + if (!strcmp(argv[index], "hold_credit")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_HOLD_CREDIT; + } else if (!strcmp(argv[index], "mode_optional")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_MODE_OPTIONAL; + } else if (!strcmp(argv[index], "extended_control")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_EXT_WIN_SIZE; + } else if (!strcmp(argv[index], "sec")) { + l2cap_server->server.sec_level = strtoul(argv[++index], NULL, 16); + } else { + l2cap_server->server.psm = 0; + shell_help(sh); + return SHELL_CMD_HELP_PRINTED; + } + } + + if ((l2cap_server->options & BT_L2CAP_BR_SERVER_OPT_EXT_WIN_SIZE) && + (!(l2cap_server->options & + (BT_L2CAP_BR_SERVER_OPT_ERET | BT_L2CAP_BR_SERVER_OPT_STREAM)))) { + shell_error(sh, "[extended_control] only supports mode eret and stream"); + l2cap_server->server.psm = 0U; + return -ENOEXEC; + } +#endif /* CONFIG_BT_L2CAP_RET_FC */ + + if (bt_l2cap_br_server_register(&l2cap_server->server) < 0) { + shell_error(sh, "Unable to register psm"); + l2cap_server->server.psm = 0U; + return -ENOEXEC; + } + + shell_print(sh, "L2CAP psm %u registered", l2cap_server->server.psm); + + return 0; +} + +SHELL_STATIC_SUBCMD_SET_CREATE( + l2cap_br_cmds, + SHELL_CMD_ARG(register, NULL, " [option]", cmd_l2cap_register, 2, 5), + SHELL_CMD_ARG(connect, NULL, " [option]", cmd_connect, 2, 3), + SHELL_CMD_ARG(disconnect, NULL, "[id]", cmd_l2cap_disconnect, 2, 0), + SHELL_CMD_ARG(send, NULL, "[id] [length of data] [data] ", cmd_l2cap_send, 4, 0), + SHELL_SUBCMD_SET_END); + +static int cmd_default_handler(const struct shell *sh, size_t argc, char **argv) +{ + if (argc == 1) { + shell_help(sh); + return SHELL_CMD_HELP_PRINTED; + } + + shell_error(sh, "%s unknown parameter: %s", argv[0], argv[1]); + + return -EINVAL; +} + +SHELL_CMD_REGISTER(l2cap_br, &l2cap_br_cmds, "Bluetooth classic l2cap shell commands", + cmd_default_handler); diff --git a/tests/bluetooth/classic/l2cap/testcase.yaml b/tests/bluetooth/classic/l2cap/testcase.yaml new file mode 100644 index 000000000000..3a168d312496 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/testcase.yaml @@ -0,0 +1,24 @@ +tests: + bluetooth.classic.l2cap: + platform_allow: + - native_sim + integration_platforms: + - native_sim + tags: + - bluetooth + - l2cap + harness: pytest + harness_config: + pytest_dut_scope: session + fixture: usb_hci + timeout: 480 + bluetooth.classic.l2cap.no_blobs: + platform_allow: + - mimxrt1170_evk@B/mimxrt1176/cm7 + tags: + - bluetooth + - l2cap + extra_args: + - CONFIG_BUILD_ONLY_NO_BLOBS=y + timeout: 480 + build_only: true