From e0b6117d84c6664c85abf24c9cab941050955b47 Mon Sep 17 00:00:00 2001 From: Cheng Chang Date: Tue, 22 Apr 2025 14:14:35 +0800 Subject: [PATCH] SHA-1: 606d9e1036d09b473013b2cc3768c4e10e053765 * tests: bluetooth: classic: Add test suite l2cap. IUT works as a l2cap server with basic mode. The peer device, l2cap client with basic mode, is a PC with running `bumble` on it. This test only performs the function of L2CAP basic mode. Support multiple l2cap enerties in new shell l2cap_br, which may support more test function. In the test suite , there are two groups in test cases. Group 1 Including case1-case8 focuses on connection and disconnection around l2cap. The impact of active and passive acl connectivity, disconnectivity and authentication as well as disconnection from ACL without l2cap disconnect is tested. Group2 Including case9-case14 revolves around the basic parameters of L2CAPserver configuration, data transfer. Case 9: Test l2cap connection with max MTU(0xffff).But the max mtu which the stack supports is (CONFIG_BT_BUF_ACL_RX_SIZE - 4U = 196). Case 10: Test l2cap connection with min MTU(0x30). Case 11: Test l2cap connection with invaild PSM. Case 12: Test l2cap multi_channel connection and data tranfer. Case 13: Stress Test. Repeat l2cap connect, disconnect operation. Case 14: Stress Test. Repeat data transfer in a single connection. In Case 13 and 14, if enlarging STRESS_TEST_MAX_COUNT and test fail, you can enlarge timeout in testcase.yml. test_l2cap_server.py is a file containing real test cases. test_l2cap_common.py is a common file. It encapsulates some test function functions that are commonly used for test cases. Signed-off-by: Cheng Chang --- tests/bluetooth/classic/l2cap/CMakeLists.txt | 10 + tests/bluetooth/classic/l2cap/README.rst | 74 ++ .../mimxrt1170_evk_mimxrt1176_cm7_B.conf | 10 + .../mimxrt1170_evk_mimxrt1176_cm7_B.overlay | 11 + tests/bluetooth/classic/l2cap/prj.conf | 17 + .../classic/l2cap/pytest/conftest.py | 57 ++ .../classic/l2cap/pytest/test_l2cap_common.py | 115 +++ .../classic/l2cap/pytest/test_l2cap_server.py | 893 ++++++++++++++++++ .../bluetooth/classic/l2cap/src/l2cap_test.c | 543 +++++++++++ tests/bluetooth/classic/l2cap/testcase.yaml | 24 + 10 files changed, 1754 insertions(+) create mode 100644 tests/bluetooth/classic/l2cap/CMakeLists.txt create mode 100644 tests/bluetooth/classic/l2cap/README.rst create mode 100644 tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.conf create mode 100644 tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.overlay create mode 100644 tests/bluetooth/classic/l2cap/prj.conf create mode 100644 tests/bluetooth/classic/l2cap/pytest/conftest.py create mode 100644 tests/bluetooth/classic/l2cap/pytest/test_l2cap_common.py create mode 100644 tests/bluetooth/classic/l2cap/pytest/test_l2cap_server.py create mode 100644 tests/bluetooth/classic/l2cap/src/l2cap_test.c create mode 100644 tests/bluetooth/classic/l2cap/testcase.yaml diff --git a/tests/bluetooth/classic/l2cap/CMakeLists.txt b/tests/bluetooth/classic/l2cap/CMakeLists.txt new file mode 100644 index 000000000000..89de0b86386a --- /dev/null +++ b/tests/bluetooth/classic/l2cap/CMakeLists.txt @@ -0,0 +1,10 @@ +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +set(NO_QEMU_SERIAL_BT_SERVER 1) + +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(bluetooth) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) diff --git a/tests/bluetooth/classic/l2cap/README.rst b/tests/bluetooth/classic/l2cap/README.rst new file mode 100644 index 000000000000..6dade18ffaf2 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/README.rst @@ -0,0 +1,74 @@ +.. _bluetooth_classic_l2cap_server_tests: + +Bluetooth Classic L2cap Server Tests +################################## + +Overview +******** + +This test suite uses ``bumble`` for testing Bluetooth Classic communication between a host +PC (running :ref:`Twister `) and a device under test (DUT) running Zephyr. + +Prerequisites +************* + +The test suite has the following prerequisites: + +* The ``bumble`` library installed on the host PC. +The Bluetooth Classic controller on PC side is required. Refer to getting started of `bumble`_ +for details. + +The HCI transport for ``bumble`` can be configured as follows: + +* A specific configuration context can be provided along with the ``usb_hci`` fixture separated by + a ``:`` (i.e. specify fixture ``usb_hci:usb:0`` to use the ``usb:0`` as hci transport for + ``bumble``). +* The configuration context can be overridden using the `hci transport`_ can be provided using the + ``--hci-transport`` test suite argument (i.e. run ``twister`` with the + ``--pytest-args=--hci-transport=usb:0`` argument to use the ``usb:0`` as hci transport for + ``bumble``). + +Building and Running +******************** + +Running on mimxrt1170_evk@B/mimxrt1176/cm7 +========================================== + +Running the test suite on :ref:`mimxrt1170_evk` relies on configuration of ``bumble``. + +On the host PC, a HCI transport needs to be required. Refer to `bumble platforms`_ page of +``bumble`` for details. + +For example, on windows, a PTS dongle is used. After `WinUSB driver`_ has been installed, +the HCI transport would be USB transport interface ``usb:``. + +If the HCI transport is ``usb:0`` and debug console port is ``COM4``, the test suite can be +launched using Twister: + +.. code-block:: shell + + west twister -v -p mimxrt1170_evk@B/mimxrt1176/cm7 --device-testing --device-serial COM4 -T tests/bluetooth/classic/l2cap -O l2cap_s --force-platform --west-flash --west-runner=jlink -X usb_hci:usb:0 + +Running on Hardware +=================== + +Running the test suite on hardware requires a HCI transport connected to the host PC. + +The test suite can be launched using Twister. Below is an example for running on the +:zephyr:board:`mimxrt1170_evk@B/mimxrt1176/cm7`: + +.. code-block:: shell + + west twister -v -p mimxrt1170_evk@B/mimxrt1176/cm7 --device-testing --device-serial COM4 -T tests/bluetooth/classic/l2cap -O l2cap_s --force-platform --west-flash --west-runner=jlink -X usb_hci:usb:0 + +.. _bumble: + https://google.github.io/bumble/getting_started.html + +.. _hci transport: + https://google.github.io/bumble/transports/index.html + +.. _bumble platforms: + https://google.github.io/bumble/platforms/index.html + +.. _WinUSB driver: + https://google.github.io/bumble/platforms/windows.html diff --git a/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.conf b/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.conf new file mode 100644 index 000000000000..289b5a597386 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.conf @@ -0,0 +1,10 @@ +#select NXP NW612 Chipset +CONFIG_BT_NXP_NW612=y + +CONFIG_BT_SETTINGS=n +CONFIG_FLASH=n +CONFIG_FLASH_MAP=n +CONFIG_NVS=n +CONFIG_SETTINGS=n + +CONFIG_ENTROPY_GENERATOR=y diff --git a/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.overlay b/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.overlay new file mode 100644 index 000000000000..96ef63ad60b2 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/boards/mimxrt1170_evk_mimxrt1176_cm7_B.overlay @@ -0,0 +1,11 @@ +/* + * Copyright 2024 NXP + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/ { + chosen { + zephyr,sram = &dtcm; + }; +}; diff --git a/tests/bluetooth/classic/l2cap/prj.conf b/tests/bluetooth/classic/l2cap/prj.conf new file mode 100644 index 000000000000..2230d1bcbe77 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/prj.conf @@ -0,0 +1,17 @@ +CONFIG_BT=y +CONFIG_BT_CLASSIC=y +CONFIG_BT_SHELL=y +CONFIG_LOG=y +CONFIG_DEBUG=y +CONFIG_ZTEST=y +CONFIG_BT_L2CAP_DYNAMIC_CHANNEL=y +CONFIG_BT_L2CAP_RET=y +CONFIG_BT_L2CAP_FC=y +CONFIG_BT_L2CAP_ENH_RET=y +CONFIG_BT_L2CAP_STREAM=y +CONFIG_BT_L2CAP_FCS=y +CONFIG_BT_L2CAP_EXT_WIN_SIZE=y +CONFIG_BT_L2CAP_MAX_WINDOW_SIZE=5 +CONFIG_BT_DEVICE_NAME="L2CAP_BR" +CONFIG_BT_CREATE_CONN_TIMEOUT=30 +CONFIG_BT_PAGE_TIMEOUT=0xFFFF diff --git a/tests/bluetooth/classic/l2cap/pytest/conftest.py b/tests/bluetooth/classic/l2cap/pytest/conftest.py new file mode 100644 index 000000000000..f9ad92a35435 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/pytest/conftest.py @@ -0,0 +1,57 @@ +# Copyright 2024 NXP +# +# SPDX-License-Identifier: Apache-2.0 +import re + +import pytest +from test_l2cap_common import L2CAP_SERVER_PSM, MODE, logger +from twister_harness import DeviceAdapter, Shell + + +def pytest_addoption(parser) -> None: + """Add local parser options to pytest.""" + parser.addoption('--hci-transport', default=None, help='Configuration HCI transport for bumble') + + +@pytest.fixture(name='initialize', scope='session') +def fixture_initialize(request, shell: Shell, dut: DeviceAdapter): + """Session initializtion""" + # Get HCI transport for bumble + hci = request.config.getoption('--hci-transport') + + if hci is None: + for fixture in dut.device_config.fixtures: + if fixture.startswith('usb_hci:'): + hci = fixture.split(sep=':', maxsplit=1)[1] + break + + assert hci is not None + + lines = shell.exec_command("bt init") + lines = dut.readlines_until("Bluetooth initialized") + regex = r"Identity: (?P(.*\s\(.*\)))" + bd_addr = None + for line in lines: + logger.info(f"Shell log {line}") + m = re.search(regex, line) + if m: + bd_addr = m.group('bd_addr') + + if bd_addr is None: + logger.error('Fail to get IUT BD address') + raise AssertionError + + lines = shell.exec_command("br pscan on") + lines = shell.exec_command("br iscan on") + logger.info('initialized') + + lines = shell.exec_command(f"l2cap_br register {format(L2CAP_SERVER_PSM, 'x')} {MODE}") + logger.info("l2cap server register") + return hci, bd_addr + + +@pytest.fixture +def l2cap_br_dut(initialize): + logger.info('Start running testcase') + yield initialize + logger.info('Done') diff --git a/tests/bluetooth/classic/l2cap/pytest/test_l2cap_common.py b/tests/bluetooth/classic/l2cap/pytest/test_l2cap_common.py new file mode 100644 index 000000000000..c4d606e5935e --- /dev/null +++ b/tests/bluetooth/classic/l2cap/pytest/test_l2cap_common.py @@ -0,0 +1,115 @@ +# Copyright 2025 NXP +# +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +import logging + +from bumble.core import BT_BR_EDR_TRANSPORT +from bumble.hci import ( + HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, +) +from bumble.pairing import PairingDelegate + +logger = logging.getLogger(__name__) + +L2CAP_SERVER_PSM = 0x1001 +MODE = "basic" +L2CAP_CHAN_IUT_ID = 0 +MAX_MTU = 0xFFFF +MIN_MTU = 0x30 +STRESS_TEST_MAX_COUNT = 50 + + +class Delegate(PairingDelegate): + def __init__( + self, + dut, + io_capability, + ): + super().__init__( + io_capability, + ) + self.dut = dut + + +async def device_power_on(device) -> None: + while True: + try: + await device.power_on() + break + except Exception: + continue + + +async def wait_for_shell_response(dut, message): + found = False + lines = [] + try: + while found is False: + read_lines = dut.readlines() + for line in read_lines: + if message in line: + found = True + break + lines = lines + read_lines + await asyncio.sleep(0.1) + except Exception as e: + logger.error(f'{e}!', exc_info=True) + raise e + return found, lines + + +async def send_cmd_to_iut(shell, dut, cmd, parse=None, wait=True): + found = False + lines = shell.exec_command(cmd) + if wait: + if parse is not None: + found, lines = await wait_for_shell_response(dut, parse) + else: + found = True + else: + found = False + if parse is not None: + for line in lines: + if parse in line: + found = True + break + else: + found = True + logger.info(f'{lines}') + assert found is True + return lines + + +async def bumble_acl_connect(shell, dut, device, target_address): + connection = None + try: + connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) + logger.info(f'=== Connected to {connection.peer_address}!') + except Exception as e: + logger.error(f'Fail to connect to {target_address}!') + raise e + return connection + + +async def bumble_acl_disconnect(shell, dut, device, connection): + await device.disconnect( + connection, reason=HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR + ) + found, lines = await wait_for_shell_response(dut, "Disconnected:") + logger.info(f'lines : {lines}') + assert found is True + return found, lines + + +async def bumble_l2cap_disconnect(shell, dut, incoming_channel, iut_id=L2CAP_CHAN_IUT_ID): + try: + await incoming_channel.disconnect() + except Exception as e: + logger.error('Fail to send l2cap disconnect command!') + raise e + assert incoming_channel.disconnection_result is None + + found, _ = await wait_for_shell_response(dut, f"Channel {iut_id} disconnected") + assert found is True diff --git a/tests/bluetooth/classic/l2cap/pytest/test_l2cap_server.py b/tests/bluetooth/classic/l2cap/pytest/test_l2cap_server.py new file mode 100644 index 000000000000..9e3181cf2680 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/pytest/test_l2cap_server.py @@ -0,0 +1,893 @@ +# Copyright 2025 NXP +# +# SPDX-License-Identifier: Apache-2.0 +import asyncio +import sys + +from bumble.core import ProtocolError +from bumble.device import Device +from bumble.hci import ( + Address, + HCI_Write_Page_Timeout_Command, +) +from bumble.l2cap import ( + ClassicChannelSpec, +) +from bumble.pairing import PairingConfig, PairingDelegate +from bumble.snoop import BtSnooper +from bumble.transport import open_transport_or_link +from test_l2cap_common import ( + L2CAP_CHAN_IUT_ID, + L2CAP_SERVER_PSM, + MODE, + STRESS_TEST_MAX_COUNT, + Delegate, + bumble_acl_connect, + bumble_acl_disconnect, + device_power_on, + logger, + send_cmd_to_iut, + wait_for_shell_response, +) +from twister_harness import DeviceAdapter, Shell + + +async def l2cap_server_case_1(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_1 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br disconnect {L2CAP_CHAN_IUT_ID}", + f"Channel {L2CAP_CHAN_IUT_ID} disconnected", + ) + + await bumble_acl_disconnect(shell, dut, device, connection) + + +async def l2cap_server_case_2(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_2 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + + await device.authenticate(connection) + await device.encrypt(connection) + + l2cap_channel = await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + await l2cap_channel.disconnect() + found, _ = await wait_for_shell_response( + dut, f"Channel {L2CAP_CHAN_IUT_ID} disconnected" + ) + assert found is True + await bumble_acl_disconnect(shell, dut, device, connection) + + +async def l2cap_server_case_3(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_3 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + lines = await send_cmd_to_iut(shell, dut, "bt disconnect", "Disconnected:") + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + logger.info(f'l2cap disconnect {L2CAP_CHAN_IUT_ID} successfully') + found = True + break + assert found is True + + +async def l2cap_server_case_4(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_4 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + found, lines = await bumble_acl_disconnect(shell, dut, device, connection) + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + found = True + break + assert found is True + + +async def l2cap_server_case_5(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_5 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br disconnect {L2CAP_CHAN_IUT_ID}", + f"Channel {L2CAP_CHAN_IUT_ID} disconnected", + ) + + await bumble_acl_disconnect(shell, dut, device, connection) + + +async def l2cap_server_case_6(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_6 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + l2cap_channel = await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + await l2cap_channel.disconnect() + found, _ = await wait_for_shell_response( + dut, f"Channel {L2CAP_CHAN_IUT_ID} disconnected" + ) + assert found is True + + await bumble_acl_disconnect(shell, dut, device, connection) + + +async def l2cap_server_case_7(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_7 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + lines = await send_cmd_to_iut(shell, dut, "bt disconnect", "Disconnected:") + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + logger.info(f'l2cap disconnect {L2CAP_CHAN_IUT_ID} successfully') + found = True + break + assert found is True + + +async def l2cap_server_case_8(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_8 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + found, lines = await bumble_acl_disconnect(shell, dut, device, connection) + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + found = True + break + assert found is True + + +async def l2cap_server_case_9(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_9 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM, mtu=65535) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + found, lines = await bumble_acl_disconnect(shell, dut, device, connection) + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + found = True + break + assert found is True + + +async def l2cap_server_case_10(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_10 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM, mtu=48) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + found, lines = await bumble_acl_disconnect(shell, dut, device, connection) + found = False + for line in lines: + if f"Channel {L2CAP_CHAN_IUT_ID} disconnected" in line: + found = True + break + assert found is True + + +async def l2cap_server_case_11(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_11 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + + assert connection is not None + + await device.authenticate(connection) + await device.encrypt(connection) + + try: + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=0x1003)) + except ProtocolError as error: + logger.info(f'error_code = {error.error_code}') + assert ( + error.error_code == 0x2 + and error.error_name == 'CONNECTION_REFUSED_PSM_NOT_SUPPORTED' + ) + + +async def l2cap_server_case_12(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_12 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + client0_received = [] + client1_received = [] + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + address = device.public_address.to_string().split('/P')[0] + logger.info(f"address = {address}") + + await send_cmd_to_iut(shell, dut, f"br connect {address}", "Connected:") + for conn in device.connections.values(): + if conn.self_address.to_string().split('/P')[0] == address.split(" ")[0]: + connection = conn + break + + assert connection is not None + + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br register 1003 {MODE}", + f"L2CAP psm {str(0x1003)} registered", + wait=False, + ) + + await device.authenticate(connection) + await device.encrypt(connection) + + l2cap_channel0 = await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + + def on_client0_data(data): + client0_received.append(data) + + l2cap_channel0.sink = on_client0_data + l2cap_channel1 = await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=0x1003) + ) + found, _ = await wait_for_shell_response(dut, "Channel 1 connected") + assert found is True + + def on_client1_data(data): + client1_received.append(data) + + l2cap_channel1.sink = on_client1_data + + data = "this is server case 13,bumble send data to iut" + data_ba = bytearray() + data_ba.extend(data.encode('utf-8')) + data_ba.append(0) + logger.info(f"test l2cap server recv data {data}") + l2cap_channel0.send_pdu(data_ba) + l2cap_channel1.send_pdu(data_ba) + found, lines = await wait_for_shell_response(dut, "Incoming data channel") + count = 0 + for line in lines: + if data in line: + count = count + 1 + assert count == 2 + + data = "this_is_server_case_13_iut_send_data_to_bumble" + logger.info(f"test l2cap send data {data}") + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br send {L2CAP_CHAN_IUT_ID} {data} {str(hex(len(data)))[2:]}", + None, + wait=False, + ) + await asyncio.sleep(0.5) + logger.info(f"l2cap_channel0 recv data = {client0_received}") + await send_cmd_to_iut( + shell, dut, f"l2cap_br send 1 {data} {str(hex(len(data)))[2:]}", None, wait=False + ) + await asyncio.sleep(0.5) + logger.info(f"l2cap_channel1 recv data = {client1_received}") + data_ba.clear() + data_ba.extend(data.encode('utf-8')) + recv_ba = bytearray() + for data in client0_received: + recv_ba.extend(data) + assert data_ba == recv_ba + recv_ba.clear() + for data in client1_received: + recv_ba.extend(data) + assert data_ba == recv_ba + + found, lines = await bumble_acl_disconnect(shell, dut, device, connection) + found0 = False + found1 = False + for line in lines: + if line.find("Channel 0 disconnected") != -1: + found0 = True + elif line.find("Channel 1 disconnected") != -1: + found1 = True + assert found0 is True and found1 is True + + +async def l2cap_server_case_13(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_13 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + + await device.authenticate(connection) + await device.encrypt(connection) + + for _ in range(STRESS_TEST_MAX_COUNT): + await connection.create_l2cap_channel(spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM)) + found, _ = await wait_for_shell_response( + dut, f"Channel {L2CAP_CHAN_IUT_ID} connected" + ) + assert found is True + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br disconnect {L2CAP_CHAN_IUT_ID}", + f"Channel {L2CAP_CHAN_IUT_ID} disconnected", + ) + + await bumble_acl_disconnect(shell, dut, device, connection) + + +async def l2cap_server_case_14(hci_port, shell, dut, address) -> None: + logger.info('<<< l2cap_server_case_14 ...') + + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + device.classic_enabled = True + device.le_enabled = False + delegate = Delegate(dut, PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=True, bonding=True, delegate=delegate + ) + + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + data_ba = bytearray() + data_recv = "this is server case 13,bumble send data to iut" + data_send = "this_is_server_case_13_iut_send_data_to_bumble" + + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + + await send_cmd_to_iut(shell, dut, "br clear all", None) + + target_address = address.split(" ")[0] + connection = await bumble_acl_connect(shell, dut, device, target_address) + client_received = [] + + def on_client_data(data): + client_received.append(data) + + await device.authenticate(connection) + await device.encrypt(connection) + + l2cap_channel = await connection.create_l2cap_channel( + spec=ClassicChannelSpec(psm=L2CAP_SERVER_PSM) + ) + found, _ = await wait_for_shell_response(dut, f"Channel {L2CAP_CHAN_IUT_ID} connected") + assert found is True + l2cap_channel.sink = on_client_data + for _ in range(STRESS_TEST_MAX_COUNT): + logger.info(f"test l2cap server recv data {data_recv}") + data_ba.clear() + data_ba.extend(data_recv.encode('utf-8')) + data_ba.append(0) + l2cap_channel.send_pdu(data_ba) + found, _ = await wait_for_shell_response(dut, data_recv) + assert found is True + + logger.info(f"test l2cap server send data {data_send}") + client_received.clear() + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br send {L2CAP_CHAN_IUT_ID} {data_send} {str(hex(len(data_send)))[2:]}", + None, + wait=False, + ) + await asyncio.sleep(0.5) + data_ba.clear() + data_ba.extend(data_send.encode('utf-8')) + recv_ba = bytearray() + for data in client_received: + recv_ba.extend(data) + assert data_ba == recv_ba + + await send_cmd_to_iut( + shell, + dut, + f"l2cap_br disconnect {L2CAP_CHAN_IUT_ID}", + f"Channel {L2CAP_CHAN_IUT_ID} disconnected", + ) + + await bumble_acl_disconnect(shell, dut, device, connection) + + +class TestL2capBRServer: + def test_l2cap_server_case_1(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test passive ACL connection,l2cap connect active + authenticate and active l2cap disconnect""" + logger.info(f'test_l2cap_server_case_1 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_1(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_2(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test passive ACL connection,l2cap connect + active authenticate and passive l2cap disconnect""" + logger.info(f'test_l2cap_server_case_2 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_2(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_3(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test passive ACL connection,l2cap connect active + authenticate and active acl disconnect.l2cap disconnect should be successfully.""" + logger.info(f'test_l2cap_server_case_3 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_3(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_4(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test passive ACL connection,l2cap connect active + authenticate and passive acl disconnect.l2cap disconnect should be successfully.""" + logger.info(f'test_l2cap_server_case_4 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_4(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_5(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test active ACL connection,l2cap connect active + authenticate and active l2cap disconnect""" + logger.info(f'test_l2cap_server_case_5 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_5(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_6(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test active ACL connection,l2cap connect active + authenticate and passive l2cap disconnect""" + logger.info(f'test_l2cap_server_case_6 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_6(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_7(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test active ACL connection,l2cap connect active + authenticate and active acl disconnect.l2cap disconnect should be successfully.""" + logger.info(f'test_l2cap_server_case_7 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_7(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_8(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test active ACL connection,l2cap connect active + authenticate and passive acl disconnect.l2cap disconnect should be successfully.""" + logger.info(f'test_l2cap_server_case_8 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_8(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_9(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test l2cap connection with max MTU(0xffff).But + the max mtu which the stack supports is (CONFIG_BT_BUF_ACL_RX_SIZE - 4U = 196).""" + logger.info(f'test_l2cap_server_case_9 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_9(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_10(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test l2cap connection with min MTU(0x30).""" + logger.info(f'test_l2cap_server_case_10 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_10(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_11(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test l2cap connection with invalid PSM.""" + logger.info(f'test_l2cap_server_case_11 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_11(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_12(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Test l2cap multi_channel connection and data transfer. + Two PSM(0x1001, 0x1003) are registered in iut which is server""" + logger.info(f'test_l2cap_server_case_12 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_12(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_13(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Stress Test. Repeat l2cap connect, disconnect operation""" + logger.info(f'test_l2cap_server_case_13 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_13(hci, shell, dut, iut_address)) + + def test_l2cap_server_case_14(self, shell: Shell, dut: DeviceAdapter, l2cap_br_dut): + """Stress Test. duplicate data transfer operation""" + logger.info(f'test_l2cap_server_case_14 {l2cap_br_dut}') + hci, iut_address = l2cap_br_dut + asyncio.run(l2cap_server_case_14(hci, shell, dut, iut_address)) diff --git a/tests/bluetooth/classic/l2cap/src/l2cap_test.c b/tests/bluetooth/classic/l2cap/src/l2cap_test.c new file mode 100644 index 000000000000..c184f5bc2980 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/src/l2cap_test.c @@ -0,0 +1,543 @@ +/* sdp_client.c - Bluetooth classic SDP client smoke test */ + +/* + * Copyright 2024 NXP + * + * SPDX-License-Identifier: Apache-2.0 + */ +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include "host/shell/bt.h" +#include "common/bt_shell_private.h" + +#define DATA_BREDR_MTU 48 + +NET_BUF_POOL_FIXED_DEFINE(data_tx_pool, 1, BT_L2CAP_SDU_BUF_SIZE(DATA_BREDR_MTU), + CONFIG_BT_CONN_TX_USER_DATA_SIZE, NULL); +NET_BUF_POOL_FIXED_DEFINE(data_rx_pool, 1, DATA_BREDR_MTU, 8, NULL); + +struct l2cap_br_chan { + struct bt_l2cap_br_chan chan; +#if defined(CONFIG_BT_L2CAP_RET_FC) + struct k_fifo l2cap_recv_fifo; + bool hold_credit; +#endif /* CONFIG_BT_L2CAP_RET_FC */ +}; + +struct app_l2cap_br_chan { + bool active; + uint8_t id; + struct bt_conn *conn; + struct l2cap_br_chan l2cap_chan; +}; + +struct bt_l2cap_br_server { + struct bt_l2cap_server server; +#if defined(CONFIG_BT_L2CAP_RET_FC) + uint8_t options; +#endif /* CONFIG_BT_L2CAP_RET_FC */ +}; + +#define BT_L2CAP_BR_SERVER_OPT_RET BIT(0) +#define BT_L2CAP_BR_SERVER_OPT_FC BIT(1) +#define BT_L2CAP_BR_SERVER_OPT_ERET BIT(2) +#define BT_L2CAP_BR_SERVER_OPT_STREAM BIT(3) +#define BT_L2CAP_BR_SERVER_OPT_MODE_OPTIONAL BIT(4) +#define BT_L2CAP_BR_SERVER_OPT_EXT_WIN_SIZE BIT(5) +#define BT_L2CAP_BR_SERVER_OPT_HOLD_CREDIT BIT(6) + +struct app_l2cap_br_server { + bool active; + uint8_t id; + struct bt_conn *conn; + struct bt_l2cap_br_server l2cap_server; +}; + +#define APPL_L2CAP_CONNECTION_MAX_COUNT 2 +struct app_l2cap_br_chan br_l2cap[APPL_L2CAP_CONNECTION_MAX_COUNT] = {0}; +struct app_l2cap_br_server br_l2cap_server[APPL_L2CAP_CONNECTION_MAX_COUNT] = {0}; + +static int l2cap_recv(struct bt_l2cap_chan *chan, struct net_buf *buf) +{ + struct l2cap_br_chan *br_chan = CONTAINER_OF(chan, struct l2cap_br_chan, chan.chan); + struct app_l2cap_br_chan *appl_br_chan = + CONTAINER_OF(br_chan, struct app_l2cap_br_chan, l2cap_chan); + + bt_shell_print("Incoming data channel %d len %u", appl_br_chan->id, buf->len); + + if (buf->len) { + printk("Incoming data :%.*s\r\n", buf->len, buf->data); + } + +#if defined(CONFIG_BT_L2CAP_RET_FC) + if (br_chan->hold_credit) { + k_fifo_put(&br_chan->l2cap_recv_fifo, buf); + return -EINPROGRESS; + } +#endif /* CONFIG_BT_L2CAP_RET_FC */ + (void)br_chan; + + return 0; +} + +static struct net_buf *l2cap_alloc_buf(struct bt_l2cap_chan *chan) +{ + bt_shell_print("Channel %p requires buffer", chan); + + return net_buf_alloc(&data_rx_pool, K_NO_WAIT); +} + +static void l2cap_connected(struct bt_l2cap_chan *chan) +{ + struct l2cap_br_chan *br_chan = CONTAINER_OF(chan, struct l2cap_br_chan, chan.chan); + struct app_l2cap_br_chan *appl_br_chan = + CONTAINER_OF(br_chan, struct app_l2cap_br_chan, l2cap_chan); + bt_shell_print("Channel %d connected", appl_br_chan->id); + +#if defined(CONFIG_BT_L2CAP_RET_FC) + switch (br_chan->chan.rx.mode) { + case BT_L2CAP_BR_LINK_MODE_BASIC: + bt_shell_print("It is basic mode"); + if (br_chan->hold_credit) { + br_chan->hold_credit = false; + bt_shell_warn("hold_credit is unsupported in basic mode"); + } + break; + case BT_L2CAP_BR_LINK_MODE_RET: + bt_shell_print("It is retransmission mode"); + break; + case BT_L2CAP_BR_LINK_MODE_FC: + bt_shell_print("It is flow control mode"); + break; + case BT_L2CAP_BR_LINK_MODE_ERET: + bt_shell_print("It is enhance retransmission mode"); + break; + case BT_L2CAP_BR_LINK_MODE_STREAM: + bt_shell_print("It is streaming mode"); + break; + default: + bt_shell_error("It is unknown mode"); + break; + } +#endif /* CONFIG_BT_L2CAP_RET_FC */ + (void)br_chan; +} + +static void l2cap_disconnected(struct bt_l2cap_chan *chan) +{ + struct l2cap_br_chan *br_chan = CONTAINER_OF(chan, struct l2cap_br_chan, chan.chan); + struct app_l2cap_br_chan *appl_br_chan = + CONTAINER_OF(br_chan, struct app_l2cap_br_chan, l2cap_chan); + appl_br_chan->conn = NULL; + appl_br_chan->active = false; + + bt_shell_print("Channel %d disconnected", appl_br_chan->id); + +#if defined(CONFIG_BT_L2CAP_RET_FC) + struct net_buf *buf; + + do { + buf = k_fifo_get(&br_chan->l2cap_recv_fifo, K_NO_WAIT); + if (buf != NULL) { + net_buf_unref(buf); + } + } while (buf != NULL); +#endif /* CONFIG_BT_L2CAP_RET_FC */ +} + +static const struct bt_l2cap_chan_ops l2cap_ops = { + .alloc_buf = l2cap_alloc_buf, + .recv = l2cap_recv, + .connected = l2cap_connected, + .disconnected = l2cap_disconnected, +}; + +struct app_l2cap_br_chan *appl_br_l2cap(struct bt_conn *conn) +{ + for (uint8_t index = 0; index < APPL_L2CAP_CONNECTION_MAX_COUNT; index++) { + if (br_l2cap[index].conn == NULL && br_l2cap[index].active == false) { + br_l2cap[index].conn = conn; + br_l2cap[index].active = true; + br_l2cap[index].id = index; + br_l2cap[index].l2cap_chan.chan.chan.ops = &l2cap_ops; + br_l2cap[index].l2cap_chan.chan.rx.mtu = DATA_BREDR_MTU; +#if defined(CONFIG_BT_L2CAP_RET_FC) + k_fifo_init(&br_l2cap[index].l2cap_chan.l2cap_recv_fifo); +#endif + return &br_l2cap[index]; + } + } + return NULL; +} + +static int l2cap_accept(struct bt_conn *conn, struct bt_l2cap_server *server, + struct bt_l2cap_chan **chan) +{ + struct bt_l2cap_br_server *br_server; + struct app_l2cap_br_server *appl_l2cap_server; + struct app_l2cap_br_chan *appl_l2cap = NULL; + struct l2cap_br_chan *l2cap_chan = NULL; + + br_server = CONTAINER_OF(server, struct bt_l2cap_br_server, server); + appl_l2cap_server = CONTAINER_OF(br_server, struct app_l2cap_br_server, l2cap_server); + + appl_l2cap = appl_br_l2cap(conn); + if (!appl_l2cap) { + bt_shell_error("No channels application br chan"); + return -ENOMEM; + } + l2cap_chan = &appl_l2cap->l2cap_chan; + *chan = &l2cap_chan->chan.chan; + + bt_shell_print("Incoming BR/EDR conn %p", conn); + +#if defined(CONFIG_BT_L2CAP_RET_FC) + if (br_server->options & BT_L2CAP_BR_SERVER_OPT_HOLD_CREDIT) { + l2cap_chan->hold_credit = true; + } else { + l2cap_chan->hold_credit = false; + } + + if (br_server->options & BT_L2CAP_BR_SERVER_OPT_EXT_WIN_SIZE) { + l2cap_chan->chan.rx.extended_control = true; + } else { + l2cap_chan->chan.rx.extended_control = false; + } + + if (br_server->options & BT_L2CAP_BR_SERVER_OPT_MODE_OPTIONAL) { + l2cap_chan->chan.rx.optional = true; + } else { + l2cap_chan->chan.rx.optional = false; + } + + l2cap_chan->chan.rx.fcs = BT_L2CAP_BR_FCS_16BIT; + + if (br_server->options & BT_L2CAP_BR_SERVER_OPT_STREAM) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_STREAM; + l2cap_chan->chan.rx.max_window = CONFIG_BT_L2CAP_MAX_WINDOW_SIZE; + l2cap_chan->chan.rx.max_transmit = 0; + } else if (br_server->options & BT_L2CAP_BR_SERVER_OPT_ERET) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_ERET; + l2cap_chan->chan.rx.max_window = CONFIG_BT_L2CAP_MAX_WINDOW_SIZE; + l2cap_chan->chan.rx.max_transmit = 3; + } else if (br_server->options & BT_L2CAP_BR_SERVER_OPT_FC) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_FC; + l2cap_chan->chan.rx.max_window = CONFIG_BT_L2CAP_MAX_WINDOW_SIZE; + l2cap_chan->chan.rx.max_transmit = 3; + } else if (br_server->options & BT_L2CAP_BR_SERVER_OPT_RET) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_RET; + l2cap_chan->chan.rx.max_window = CONFIG_BT_L2CAP_MAX_WINDOW_SIZE; + l2cap_chan->chan.rx.max_transmit = 3; + } +#endif /* CONFIG_BT_L2CAP_RET_FC */ + (void)br_server; + return 0; +} + +struct app_l2cap_br_server *appl_br_l2cap_server_alloc(uint16_t psm) +{ + for (uint8_t index = 0; index < APPL_L2CAP_CONNECTION_MAX_COUNT; index++) { + if (br_l2cap_server[index].conn == NULL && br_l2cap_server[index].active == false) { + br_l2cap_server[index].active = true; + br_l2cap_server[index].id = index; + br_l2cap_server[index].l2cap_server.server.psm = psm; + br_l2cap_server[index].l2cap_server.server.accept = l2cap_accept; + return &br_l2cap_server[index]; + } + } + return NULL; +} + +static int cmd_connect(const struct shell *sh, size_t argc, char *argv[]) +{ + uint16_t psm; + struct bt_conn_info info; + int err; + struct app_l2cap_br_chan *appl_l2cap = NULL; + struct l2cap_br_chan *l2cap_chan = NULL; + + if (!default_conn) { + shell_error(sh, "Not connected"); + return -ENOEXEC; + } + + appl_l2cap = appl_br_l2cap(default_conn); + if (!appl_l2cap) { + bt_shell_error("No channels application br chan"); + return -ENOMEM; + } + l2cap_chan = &appl_l2cap->l2cap_chan; + if (l2cap_chan->chan.chan.conn) { + bt_shell_error("No channels available"); + return -ENOMEM; + } + + err = bt_conn_get_info(default_conn, &info); + if ((err < 0) || (info.type != BT_CONN_TYPE_BR)) { + shell_error(sh, "Invalid conn type"); + return -ENOEXEC; + } + + psm = strtoul(argv[1], NULL, 16); + +#if defined(CONFIG_BT_L2CAP_RET_FC) + if (!strcmp(argv[2], "basic")) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_BASIC; + } else if (!strcmp(argv[2], "ret")) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_RET; + l2cap_chan->chan.rx.max_transmit = 3; + } else if (!strcmp(argv[2], "fc")) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_FC; + l2cap_chan->chan.rx.max_transmit = 3; + } else if (!strcmp(argv[2], "eret")) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_ERET; + l2cap_chan->chan.rx.max_transmit = 3; + } else if (!strcmp(argv[2], "stream")) { + l2cap_chan->chan.rx.mode = BT_L2CAP_BR_LINK_MODE_STREAM; + l2cap_chan->chan.rx.max_transmit = 0; + } else { + shell_help(sh); + return SHELL_CMD_HELP_PRINTED; + } + + l2cap_chan->hold_credit = false; + l2cap_chan->chan.rx.optional = false; + l2cap_chan->chan.rx.extended_control = false; + + for (size_t index = 3; index < argc; index++) { + if (!strcmp(argv[index], "hold_credit")) { + l2cap_chan->hold_credit = true; + } else if (!strcmp(argv[index], "mode_optional")) { + l2cap_chan->chan.rx.optional = true; + } else if (!strcmp(argv[index], "extended_control")) { + l2cap_chan->chan.rx.extended_control = true; + } else if (!strcmp(argv[index], "sec")) { + l2cap_chan->chan.required_sec_level = strtoul(argv[++index], NULL, 16); + } else if (!strcmp(argv[index], "mtu")) { + l2cap_chan->chan.rx.mtu = strtoul(argv[++index], NULL, 16); + } else { + shell_help(sh); + return SHELL_CMD_HELP_PRINTED; + } + } + + if ((l2cap_chan->chan.rx.extended_control) && + ((l2cap_chan->chan.rx.mode != BT_L2CAP_BR_LINK_MODE_ERET) && + (l2cap_chan->chan.rx.mode != BT_L2CAP_BR_LINK_MODE_STREAM))) { + shell_error(sh, "[extended_control] only supports mode eret and stream"); + return -ENOEXEC; + } + + if (l2cap_chan->hold_credit && (l2cap_chan->chan.rx.mode == BT_L2CAP_BR_LINK_MODE_BASIC)) { + shell_error(sh, "[hold_credit] cannot support basic mode"); + return -ENOEXEC; + } + + l2cap_chan->chan.rx.max_window = CONFIG_BT_L2CAP_MAX_WINDOW_SIZE; +#endif /* CONFIG_BT_L2CAP_RET_FC */ + + err = bt_l2cap_chan_connect(default_conn, &l2cap_chan->chan.chan, psm); + if (err < 0) { + shell_error(sh, "Unable to connect to psm %u (err %d)", psm, err); + appl_l2cap->conn = NULL; + appl_l2cap->active = false; + } else { + shell_print(sh, "L2CAP connection pending"); + } + + return err; +} + +static int cmd_l2cap_disconnect(const struct shell *sh, size_t argc, char *argv[]) +{ + int err; + uint8_t id; + + id = strtoul(argv[1], NULL, 16); + + if (br_l2cap[id].active == true) { + err = bt_l2cap_chan_disconnect(&br_l2cap[id].l2cap_chan.chan.chan); + if (err) { + shell_error(sh, "Unable to disconnect: %u", -err); + return err; + } + } + + return 0; +} + +static int cmd_l2cap_send(const struct shell *sh, size_t argc, char *argv[]) +{ + int err, mtu_len = DATA_BREDR_MTU, data_len = 0; + uint8_t id; + struct net_buf *buf; + uint16_t len; + uint16_t send_length = 0, remaining_data = 0; + struct l2cap_br_chan *l2cap_chan = NULL; + + id = strtoul(argv[1], NULL, 16); + data_len = strtoul(argv[3], NULL, 16); + mtu_len = MIN(l2cap_chan->chan.tx.mtu, DATA_BREDR_MTU); + + shell_print(sh, "data_len = %d", data_len); + + if (br_l2cap[id].active == true) { + l2cap_chan = &br_l2cap[id].l2cap_chan; + remaining_data = data_len; + while (remaining_data > 0) { + buf = net_buf_alloc(&data_tx_pool, K_SECONDS(2)); + if (!buf) { + if (l2cap_chan->chan.state != BT_L2CAP_CONNECTED) { + shell_error(sh, "Channel disconnected, stopping TX"); + return -EAGAIN; + } + shell_error(sh, "Allocation timeout, stopping TX"); + return -EAGAIN; + } + net_buf_reserve(buf, BT_L2CAP_CHAN_SEND_RESERVE); + if (remaining_data > mtu_len) { + net_buf_add_mem(buf, argv[2] + send_length, mtu_len); + len = mtu_len; + } else { + net_buf_add_mem(buf, argv[2] + send_length, remaining_data); + len = remaining_data; + } + err = bt_l2cap_chan_send(&l2cap_chan->chan.chan, buf); + if (err < 0) { + shell_error(sh, "Unable to send: %d", -err); + net_buf_unref(buf); + return -ENOEXEC; + } + send_length += len; + remaining_data -= len; + } + } else { + shell_print(sh, "channel %d not support", id); + return -EINVAL; + } + return 0; +} + +bool l2cap_psm_registered(uint16_t psm) +{ + for (uint8_t index = 0; index < APPL_L2CAP_CONNECTION_MAX_COUNT; index++) { + if (br_l2cap_server[index].active == true && + br_l2cap_server[index].l2cap_server.server.psm == psm) { + return true; + } + } + return false; +} +static int cmd_l2cap_register(const struct shell *sh, size_t argc, char *argv[]) +{ + uint16_t psm = strtoul(argv[1], NULL, 16); + struct app_l2cap_br_server *app_l2cap_server; + struct bt_l2cap_br_server *l2cap_server; + + if (l2cap_psm_registered(psm)) { + shell_print(sh, "Already registered"); + return -ENOEXEC; + } + + app_l2cap_server = appl_br_l2cap_server_alloc(psm); + if (!app_l2cap_server) { + bt_shell_error("No channels application br chan"); + return -ENOMEM; + } + l2cap_server = &app_l2cap_server->l2cap_server; + l2cap_server->server.psm = strtoul(argv[1], NULL, 16); + +#if defined(CONFIG_BT_L2CAP_RET_FC) + l2cap_server->options = 0; + + if (!strcmp(argv[2], "basic")) { + /* Support mode: None */ + } else if (!strcmp(argv[2], "ret")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_RET; + } else if (!strcmp(argv[2], "fc")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_FC; + } else if (!strcmp(argv[2], "eret")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_ERET; + } else if (!strcmp(argv[2], "stream")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_STREAM; + } else { + l2cap_server->server.psm = 0; + shell_help(sh); + return SHELL_CMD_HELP_PRINTED; + } + + for (size_t index = 3; index < argc; index++) { + if (!strcmp(argv[index], "hold_credit")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_HOLD_CREDIT; + } else if (!strcmp(argv[index], "mode_optional")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_MODE_OPTIONAL; + } else if (!strcmp(argv[index], "extended_control")) { + l2cap_server->options |= BT_L2CAP_BR_SERVER_OPT_EXT_WIN_SIZE; + } else if (!strcmp(argv[index], "sec")) { + l2cap_server->server.sec_level = strtoul(argv[++index], NULL, 16); + } else { + l2cap_server->server.psm = 0; + shell_help(sh); + return SHELL_CMD_HELP_PRINTED; + } + } + + if ((l2cap_server->options & BT_L2CAP_BR_SERVER_OPT_EXT_WIN_SIZE) && + (!(l2cap_server->options & + (BT_L2CAP_BR_SERVER_OPT_ERET | BT_L2CAP_BR_SERVER_OPT_STREAM)))) { + shell_error(sh, "[extended_control] only supports mode eret and stream"); + l2cap_server->server.psm = 0U; + return -ENOEXEC; + } +#endif /* CONFIG_BT_L2CAP_RET_FC */ + + if (bt_l2cap_br_server_register(&l2cap_server->server) < 0) { + shell_error(sh, "Unable to register psm"); + l2cap_server->server.psm = 0U; + return -ENOEXEC; + } + + shell_print(sh, "L2CAP psm %u registered", l2cap_server->server.psm); + + return 0; +} + +SHELL_STATIC_SUBCMD_SET_CREATE( + l2cap_br_cmds, + SHELL_CMD_ARG(register, NULL, " [option]", cmd_l2cap_register, 2, 5), + SHELL_CMD_ARG(connect, NULL, " [option]", cmd_connect, 2, 3), + SHELL_CMD_ARG(disconnect, NULL, "[id]", cmd_l2cap_disconnect, 2, 0), + SHELL_CMD_ARG(send, NULL, "[id] [length of data] [data] ", cmd_l2cap_send, 4, 0), + SHELL_SUBCMD_SET_END); + +static int cmd_default_handler(const struct shell *sh, size_t argc, char **argv) +{ + if (argc == 1) { + shell_help(sh); + return SHELL_CMD_HELP_PRINTED; + } + + shell_error(sh, "%s unknown parameter: %s", argv[0], argv[1]); + + return -EINVAL; +} + +SHELL_CMD_REGISTER(l2cap_br, &l2cap_br_cmds, "Bluetooth classic l2cap shell commands", + cmd_default_handler); diff --git a/tests/bluetooth/classic/l2cap/testcase.yaml b/tests/bluetooth/classic/l2cap/testcase.yaml new file mode 100644 index 000000000000..3a168d312496 --- /dev/null +++ b/tests/bluetooth/classic/l2cap/testcase.yaml @@ -0,0 +1,24 @@ +tests: + bluetooth.classic.l2cap: + platform_allow: + - native_sim + integration_platforms: + - native_sim + tags: + - bluetooth + - l2cap + harness: pytest + harness_config: + pytest_dut_scope: session + fixture: usb_hci + timeout: 480 + bluetooth.classic.l2cap.no_blobs: + platform_allow: + - mimxrt1170_evk@B/mimxrt1176/cm7 + tags: + - bluetooth + - l2cap + extra_args: + - CONFIG_BUILD_ONLY_NO_BLOBS=y + timeout: 480 + build_only: true