-
Notifications
You must be signed in to change notification settings - Fork 7.3k
/
Copy pathcore.py
969 lines (777 loc) · 37.4 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
#! /usr/bin/env python3
# Copyright (c) 2017 Linaro Limited.
# Copyright (c) 2017 Open Source Foundries Limited.
#
# SPDX-License-Identifier: Apache-2.0
"""Zephyr binary runner core interfaces
This provides the core ZephyrBinaryRunner class meant for public use,
as well as some other helpers for concrete runner classes.
"""
import abc
import argparse
import errno
import logging
import os
import platform
import re
import selectors
import shlex
import shutil
import signal
import socket
import subprocess
import sys
from dataclasses import dataclass, field
from enum import Enum
from functools import partial
from inspect import isabstract
from typing import NamedTuple, NoReturn
try:
from elftools.elf.elffile import ELFFile
ELFTOOLS_MISSING = False
except ImportError:
ELFTOOLS_MISSING = True
# Turn on to enable just logging the commands that would be run (at
# info rather than debug level), without actually running them. This
# can break runners that are expecting output or if one command
# depends on another, so it's just for debugging.
_DRY_RUN = False
_logger = logging.getLogger('runners')
# FIXME: I assume this code belongs somewhere else, but i couldn't figure out
# a good location for it, so i put it here for now
# We could potentially search for RTT blocks in hex or bin files as well,
# but since the magic string is "SEGGER RTT", i thought it might be better
# to avoid, at the risk of false positives.
def find_rtt_block(elf_file: str) -> int | None:
if ELFTOOLS_MISSING:
raise RuntimeError('the Python dependency elftools was missing; '
'see the getting started guide for details on '
'how to fix')
with open(elf_file, 'rb') as f:
elffile = ELFFile(f)
for sect in elffile.iter_sections('SHT_SYMTAB'):
symbols = sect.get_symbol_by_name('_SEGGER_RTT')
if symbols is None:
continue
for s in symbols:
return s.entry.get('st_value')
return None
class _DebugDummyPopen:
def terminate(self):
pass
def wait(self):
pass
MAX_PORT = 49151
class NetworkPortHelper:
'''Helper class for dealing with local IP network ports.'''
def get_unused_ports(self, starting_from):
'''Find unused network ports, starting at given values.
starting_from is an iterable of ports the caller would like to use.
The return value is an iterable of ports, in the same order, using
the given values if they were unused, or the next sequentially
available unused port otherwise.
Ports may be bound between this call's check and actual usage, so
callers still need to handle errors involving returned ports.'''
start = list(starting_from)
used = self._used_now()
ret = []
for desired in start:
port = desired
while port in used:
port += 1
if port > MAX_PORT:
msg = "ports above {} are in use"
raise ValueError(msg.format(desired))
used.add(port)
ret.append(port)
return ret
def _used_now(self):
handlers = {
'Windows': self._used_now_windows,
'Linux': self._used_now_linux,
'Darwin': self._used_now_darwin,
}
handler = handlers[platform.system()]
return handler()
def _used_now_windows(self):
cmd = ['netstat', '-a', '-n', '-p', 'tcp']
return self._parser_windows(cmd)
def _used_now_linux(self):
cmd = ['ss', '-a', '-n', '-t']
return self._parser_linux(cmd)
def _used_now_darwin(self):
cmd = ['netstat', '-a', '-n', '-p', 'tcp']
return self._parser_darwin(cmd)
@staticmethod
def _parser_windows(cmd):
out = subprocess.check_output(cmd).split(b'\r\n')
used_bytes = [x.split()[1].rsplit(b':', 1)[1] for x in out
if x.startswith(b' TCP')]
return {int(b) for b in used_bytes}
@staticmethod
def _parser_linux(cmd):
out = subprocess.check_output(cmd).splitlines()[1:]
used_bytes = [s.split()[3].rsplit(b':', 1)[1] for s in out]
return {int(b) for b in used_bytes}
@staticmethod
def _parser_darwin(cmd):
out = subprocess.check_output(cmd).split(b'\n')
used_bytes = [x.split()[3].rsplit(b':', 1)[1] for x in out
if x.startswith(b'tcp')]
return {int(b) for b in used_bytes}
class BuildConfiguration:
'''This helper class provides access to build-time configuration.
Configuration options can be read as if the object were a dict,
either object['CONFIG_FOO'] or object.get('CONFIG_FOO').
Kconfig configuration values are available (parsed from .config).'''
config_prefix = 'CONFIG'
def __init__(self, build_dir: str):
self.build_dir = build_dir
self.options: dict[str, str | int] = {}
self.path = os.path.join(self.build_dir, 'zephyr', '.config')
self._parse()
def __contains__(self, item):
return item in self.options
def __getitem__(self, item):
return self.options[item]
def get(self, option, *args):
return self.options.get(option, *args)
def getboolean(self, option):
'''If a boolean option is explicitly set to y or n,
returns its value. Otherwise, falls back to False.
'''
return self.options.get(option, False)
def _parse(self):
filename = self.path
opt_value = re.compile(f'^(?P<option>{self.config_prefix}_[A-Za-z0-9_]+)=(?P<value>.*)$')
not_set = re.compile(f'^# (?P<option>{self.config_prefix}_[A-Za-z0-9_]+) is not set$')
with open(filename) as f:
for line in f:
match = opt_value.match(line)
if match:
value = match.group('value').rstrip()
if value.startswith('"') and value.endswith('"'):
# A string literal should have the quotes stripped,
# but otherwise be left as is.
value = value[1:-1]
elif value == 'y':
# The character 'y' is a boolean option
# that is set to True.
value = True
else:
# Neither a string nor 'y', so try to parse it
# as an integer.
try:
base = 16 if value.startswith('0x') else 10
self.options[match.group('option')] = int(value, base=base)
continue
except ValueError:
pass
self.options[match.group('option')] = value
continue
match = not_set.match(line)
if match:
# '# CONFIG_FOO is not set' means a boolean option is false.
self.options[match.group('option')] = False
class SysbuildConfiguration(BuildConfiguration):
'''This helper class provides access to sysbuild-time configuration.
Configuration options can be read as if the object were a dict,
either object['SB_CONFIG_FOO'] or object.get('SB_CONFIG_FOO').
Kconfig configuration values are available (parsed from .config).'''
config_prefix = 'SB_CONFIG'
def _parse(self):
# If the build does not use sysbuild, skip parsing the file.
if not os.path.exists(self.path):
return
super()._parse()
class MissingProgram(FileNotFoundError):
'''FileNotFoundError subclass for missing program dependencies.
No significant changes from the parent FileNotFoundError; this is
useful for explicitly signaling that the file in question is a
program that some class requires to proceed.
The filename attribute contains the missing program.'''
def __init__(self, program):
super().__init__(errno.ENOENT, os.strerror(errno.ENOENT), program)
_RUNNERCAPS_COMMANDS = {'flash', 'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}
@dataclass
class RunnerCaps:
'''This class represents a runner class's capabilities.
Each capability is represented as an attribute with the same
name. Flag attributes are True or False.
Available capabilities:
- commands: set of supported commands; default is {'flash',
'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}.
- dev_id: whether the runner supports device identifiers, in the form of an
-i, --dev-id option. This is useful when the user has multiple debuggers
connected to a single computer, in order to select which one will be used
with the command provided.
- mult_dev_ids: whether the runner supports multiple device identifiers
for a single operation, allowing for bulk flashing of devices.
- flash_addr: whether the runner supports flashing to an
arbitrary address. Default is False. If true, the runner
must honor the --dt-flash option.
- erase: whether the runner supports an --erase option, which
does a mass-erase of the entire addressable flash on the target
before flashing. On multi-core SoCs, this may only erase portions of
flash specific the actual target core. (This option can be useful for
things like clearing out old settings values or other subsystem state
that may affect the behavior of the zephyr image. It is also sometimes
needed by SoCs which have flash-like areas that can't be sector
erased by the underlying tool before flashing; UICR on nRF SoCs
is one example.)
- reset: whether the runner supports a --reset option, which
resets the device after a flash operation is complete.
- extload: whether the runner supports a --extload option, which
must be given one time and is passed on to the underlying tool
that the runner wraps.
- tool_opt: whether the runner supports a --tool-opt (-O) option, which
can be given multiple times and is passed on to the underlying tool
that the runner wraps.
- file: whether the runner supports a --file option, which specifies
exactly the file that should be used to flash, overriding any default
discovered in the build directory.
- hide_load_files: whether the elf/hex/bin file arguments should be hidden.
- rtt: whether the runner supports SEGGER RTT. This adds a --rtt-address
option.
'''
commands: set[str] = field(default_factory=lambda: set(_RUNNERCAPS_COMMANDS))
dev_id: bool = False
mult_dev_ids: bool = False
flash_addr: bool = False
erase: bool = False
reset: bool = False
extload: bool = False
tool_opt: bool = False
file: bool = False
hide_load_files: bool = False
rtt: bool = False # This capability exists separately from the rtt command
# to allow other commands to use the rtt address
def __post_init__(self):
if self.mult_dev_ids and not self.dev_id:
raise RuntimeError('dev_id must be set along mult_dev_ids')
if not self.commands.issubset(_RUNNERCAPS_COMMANDS):
raise ValueError(f'{self.commands=} contains invalid command')
def _missing_cap(cls: type['ZephyrBinaryRunner'], option: str) -> NoReturn:
# Helper function that's called when an option was given on the
# command line that corresponds to a missing capability in the
# runner class cls.
raise ValueError(f"{cls.name()} doesn't support {option} option")
class FileType(Enum):
OTHER = 0
HEX = 1
BIN = 2
ELF = 3
class RunnerConfig(NamedTuple):
'''Runner execution-time configuration.
This is a common object shared by all runners. Individual runners
can register specific configuration options using their
do_add_parser() hooks.
'''
build_dir: str # application build directory
board_dir: str # board definition directory
elf_file: str | None # zephyr.elf path, or None
exe_file: str | None # zephyr.exe path, or None
hex_file: str | None # zephyr.hex path, or None
bin_file: str | None # zephyr.bin path, or None
uf2_file: str | None # zephyr.uf2 path, or None
file: str | None # binary file path (provided by the user), or None
file_type: FileType | None = FileType.OTHER # binary file type
gdb: str | None = None # path to a usable gdb
openocd: str | None = None # path to a usable openocd
openocd_search: list[str] = [] # add these paths to the openocd search path
rtt_address: int | None = None # address of the rtt control block
_YN_CHOICES = ['Y', 'y', 'N', 'n', 'yes', 'no', 'YES', 'NO']
class _DTFlashAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if values.lower().startswith('y'):
namespace.dt_flash = True
else:
namespace.dt_flash = False
class _ToggleAction(argparse.Action):
def __call__(self, parser, args, ignored, option):
setattr(args, self.dest, not option.startswith('--no-'))
class DeprecatedAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
_logger.warning(f'Argument {self.option_strings[0]} is deprecated' +
(f' for your runner {self._cls.name()}' if self._cls is not None else '') +
f', use {self._replacement} instead.')
setattr(namespace, self.dest, values)
def depr_action(*args, cls=None, replacement=None, **kwargs):
action = DeprecatedAction(*args, **kwargs)
action._cls = cls
action._replacement = replacement
return action
class ZephyrBinaryRunner(abc.ABC):
'''Abstract superclass for binary runners (flashers, debuggers).
**Note**: this class's API has changed relatively rarely since it
as added, but it is not considered a stable Zephyr API, and may change
without notice.
With some exceptions, boards supported by Zephyr must provide
generic means to be flashed (have a Zephyr firmware binary
permanently installed on the device for running) and debugged
(have a breakpoint debugger and program loader on a host
workstation attached to a running target).
This is supported by four top-level commands managed by the
Zephyr build system:
- 'flash': flash a previously configured binary to the board,
start execution on the target, then return.
- 'debug': connect to the board via a debugging protocol, program
the flash, then drop the user into a debugger interface with
symbol tables loaded from the current binary, and block until it
exits.
- 'debugserver': connect via a board-specific debugging protocol,
then reset and halt the target. Ensure the user is now able to
connect to a debug server with symbol tables loaded from the
binary.
- 'attach': connect to the board via a debugging protocol, then drop
the user into a debugger interface with symbol tables loaded from
the current binary, and block until it exits. Unlike 'debug', this
command does not program the flash.
This class provides an API for these commands. Every subclass is
called a 'runner' for short. Each runner has a name (like
'pyocd'), and declares commands it can handle (like
'flash'). Boards (like 'nrf52dk/nrf52832') declare which runner(s)
are compatible with them to the Zephyr build system, along with
information on how to configure the runner to work with the board.
The build system will then place enough information in the build
directory to create and use runners with this class's create()
method, which provides a command line argument parsing API. You
can also create runners by instantiating subclasses directly.
In order to define your own runner, you need to:
1. Define a ZephyrBinaryRunner subclass, and implement its
abstract methods. You may need to override capabilities().
2. Make sure the Python module defining your runner class is
imported, e.g. by editing this package's __init__.py (otherwise,
get_runners() won't work).
3. Give your runner's name to the Zephyr build system in your
board's board.cmake.
Additional advice:
- If you need to import any non-standard-library modules, make sure
to catch ImportError and defer complaints about it to a RuntimeError
if one is missing. This avoids affecting users that don't require your
runner, while still making it clear what went wrong to users that do
require it that don't have the necessary modules installed.
- If you need to ask the user something (e.g. using input()), do it
in your create() classmethod, not do_run(). That ensures your
__init__() really has everything it needs to call do_run(), and also
avoids calling input() when not instantiating within a command line
application.
- Use self.logger to log messages using the standard library's
logging API; your logger is named "runner.<your-runner-name()>"
For command-line invocation from the Zephyr build system, runners
define their own argparse-based interface through the common
add_parser() (and runner-specific do_add_parser() it delegates
to), and provide a way to create instances of themselves from
a RunnerConfig and parsed runner-specific arguments via create().
Runners use a variety of host tools and configuration values, the
user interface to which is abstracted by this class. Each runner
subclass should take any values it needs to execute one of these
commands in its constructor. The actual command execution is
handled in the run() method.'''
def __init__(self, cfg: RunnerConfig):
'''Initialize core runner state.'''
self.cfg = cfg
'''RunnerConfig for this instance.'''
self.logger = logging.getLogger(f'runners.{self.name()}')
'''logging.Logger for this instance.'''
@staticmethod
def get_runners() -> list[type['ZephyrBinaryRunner']]:
'''Get a list of all currently defined runner classes.'''
def inheritors(klass):
subclasses = set()
work = [klass]
while work:
parent = work.pop()
for child in parent.__subclasses__():
if child not in subclasses:
if not isabstract(child):
subclasses.add(child)
work.append(child)
return subclasses
return inheritors(ZephyrBinaryRunner)
@classmethod
@abc.abstractmethod
def name(cls) -> str:
'''Return this runner's user-visible name.
When choosing a name, pick something short and lowercase,
based on the name of the tool (like openocd, jlink, etc.) or
the target architecture/board (like xtensa etc.).'''
@classmethod
def capabilities(cls) -> RunnerCaps:
'''Returns a RunnerCaps representing this runner's capabilities.
This implementation returns the default capabilities.
Subclasses should override appropriately if needed.'''
return RunnerCaps()
@classmethod
def add_parser(cls, parser):
'''Adds a sub-command parser for this runner.
The given object, parser, is a sub-command parser from the
argparse module. For more details, refer to the documentation
for argparse.ArgumentParser.add_subparsers().
The lone common optional argument is:
* --dt-flash (if the runner capabilities includes flash_addr)
Runner-specific options are added through the do_add_parser()
hook.'''
# Unfortunately, the parser argument's type is not documented
# in typeshed, so we can't type annotate much here.
# Common options that depend on runner capabilities. If a
# capability is not supported, the option string or strings
# are added anyway, to prevent an individual runner class from
# using them to mean something else.
caps = cls.capabilities()
if caps.dev_id:
action = 'append' if caps.mult_dev_ids else 'store'
parser.add_argument('-i', '--dev-id',
action=action,
dest='dev_id',
help=cls.dev_id_help())
else:
parser.add_argument('-i', '--dev-id', help=argparse.SUPPRESS)
if caps.flash_addr:
parser.add_argument('--dt-flash', default=False, choices=_YN_CHOICES,
action=_DTFlashAction,
help='''If 'yes', try to use flash address
information from devicetree when flash
addresses are unknown (e.g. when flashing a .bin)''')
else:
parser.add_argument('--dt-flash', help=argparse.SUPPRESS)
if caps.file:
parser.add_argument('-f', '--file',
dest='file',
help="path to binary file")
parser.add_argument('-t', '--file-type',
dest='file_type',
help="type of binary file")
else:
parser.add_argument('-f', '--file', help=argparse.SUPPRESS)
parser.add_argument('-t', '--file-type', help=argparse.SUPPRESS)
if caps.hide_load_files:
parser.add_argument('--elf-file', help=argparse.SUPPRESS)
parser.add_argument('--hex-file', help=argparse.SUPPRESS)
parser.add_argument('--bin-file', help=argparse.SUPPRESS)
else:
parser.add_argument('--elf-file',
metavar='FILE',
action=(partial(depr_action, cls=cls,
replacement='-f/--file') if caps.file else None),
help='path to zephyr.elf'
if not caps.file else 'Deprecated, use -f/--file instead.')
parser.add_argument('--hex-file',
metavar='FILE',
action=(partial(depr_action, cls=cls,
replacement='-f/--file') if caps.file else None),
help='path to zephyr.hex'
if not caps.file else 'Deprecated, use -f/--file instead.')
parser.add_argument('--bin-file',
metavar='FILE',
action=(partial(depr_action, cls=cls,
replacement='-f/--file') if caps.file else None),
help='path to zephyr.bin'
if not caps.file else 'Deprecated, use -f/--file instead.')
parser.add_argument('--erase', '--no-erase', nargs=0,
action=_ToggleAction,
help=("mass erase flash before loading, or don't. "
"Default action depends on each specific runner."
if caps.erase else argparse.SUPPRESS))
parser.add_argument('--reset', '--no-reset', nargs=0,
action=_ToggleAction,
help=("reset device after flashing, or don't. "
"Default action depends on each specific runner."
if caps.reset else argparse.SUPPRESS))
parser.add_argument('--extload', dest='extload',
help=(cls.extload_help() if caps.extload
else argparse.SUPPRESS))
parser.add_argument('-O', '--tool-opt', dest='tool_opt',
default=[], action='append',
help=(cls.tool_opt_help() if caps.tool_opt
else argparse.SUPPRESS))
if caps.rtt:
parser.add_argument('--rtt-address', dest='rtt_address',
type=lambda x: int(x, 0),
help="""address of RTT control block. If not supplied,
it will be autodetected if possible""")
else:
parser.add_argument('--rtt-address', help=argparse.SUPPRESS)
# Runner-specific options.
cls.do_add_parser(parser)
@classmethod
@abc.abstractmethod
def do_add_parser(cls, parser):
'''Hook for adding runner-specific options.'''
@classmethod # noqa: B027
def args_from_previous_runner(cls, previous_runner,
args: argparse.Namespace):
'''Update arguments from a previously created runner.
This is intended for propagating relevant user responses
between multiple runs of the same runner, for example a
JTAG serial number.'''
@classmethod
def create(cls, cfg: RunnerConfig,
args: argparse.Namespace) -> 'ZephyrBinaryRunner':
'''Create an instance from command-line arguments.
- ``cfg``: runner configuration (pass to superclass __init__)
- ``args``: arguments parsed from execution environment, as
specified by ``add_parser()``.'''
caps = cls.capabilities()
if args.dev_id and not caps.dev_id:
_missing_cap(cls, '--dev-id')
if args.dt_flash and not caps.flash_addr:
_missing_cap(cls, '--dt-flash')
if args.erase and not caps.erase:
_missing_cap(cls, '--erase')
if args.reset and not caps.reset:
_missing_cap(cls, '--reset')
if args.extload and not caps.extload:
_missing_cap(cls, '--extload')
if args.tool_opt and not caps.tool_opt:
_missing_cap(cls, '--tool-opt')
if args.file and not caps.file:
_missing_cap(cls, '--file')
if args.file_type and not args.file:
raise ValueError("--file-type requires --file")
if args.file_type and not caps.file:
_missing_cap(cls, '--file-type')
if args.rtt_address and not caps.rtt:
_missing_cap(cls, '--rtt-address')
ret = cls.do_create(cfg, args)
if args.erase:
ret.logger.info('mass erase requested')
if args.reset:
ret.logger.info('reset after flashing requested')
return ret
@classmethod
@abc.abstractmethod
def do_create(cls, cfg: RunnerConfig,
args: argparse.Namespace) -> 'ZephyrBinaryRunner':
'''Hook for instance creation from command line arguments.'''
@staticmethod
def get_flash_address(args: argparse.Namespace,
build_conf: BuildConfiguration,
default: int = 0x0) -> int:
'''Helper method for extracting a flash address.
If args.dt_flash is true, returns the address obtained from
ZephyrBinaryRunner.flash_address_from_build_conf(build_conf).
Otherwise (when args.dt_flash is False), the default value is
returned.'''
if args.dt_flash:
return ZephyrBinaryRunner.flash_address_from_build_conf(build_conf)
else:
return default
@staticmethod
def flash_address_from_build_conf(build_conf: BuildConfiguration):
'''If CONFIG_HAS_FLASH_LOAD_OFFSET is n in build_conf,
return the CONFIG_FLASH_BASE_ADDRESS value. Otherwise, return
CONFIG_FLASH_BASE_ADDRESS + CONFIG_FLASH_LOAD_OFFSET.
'''
if build_conf.getboolean('CONFIG_HAS_FLASH_LOAD_OFFSET'):
return (build_conf['CONFIG_FLASH_BASE_ADDRESS'] +
build_conf['CONFIG_FLASH_LOAD_OFFSET'])
else:
return build_conf['CONFIG_FLASH_BASE_ADDRESS']
def run(self, command: str, **kwargs):
'''Runs command ('flash', 'debug', 'debugserver', 'attach').
This is the main entry point to this runner.'''
caps = self.capabilities()
if command not in caps.commands:
raise ValueError(f'runner {self.name()} does not implement command {command}')
self.do_run(command, **kwargs)
@abc.abstractmethod
def do_run(self, command: str, **kwargs):
'''Concrete runner; run() delegates to this. Implement in subclasses.
In case of an unsupported command, raise a ValueError.'''
@property
def build_conf(self) -> BuildConfiguration:
'''Get a BuildConfiguration for the build directory.'''
if not hasattr(self, '_build_conf'):
self._build_conf = BuildConfiguration(self.cfg.build_dir)
return self._build_conf
@property
def sysbuild_conf(self) -> SysbuildConfiguration:
'''Get a SysbuildConfiguration for the sysbuild directory.'''
if not hasattr(self, '_sysbuild_conf'):
self._sysbuild_conf = SysbuildConfiguration(os.path.dirname(self.cfg.build_dir))
return self._sysbuild_conf
@property
def thread_info_enabled(self) -> bool:
'''Returns True if self.build_conf has
CONFIG_DEBUG_THREAD_INFO enabled.
'''
return self.build_conf.getboolean('CONFIG_DEBUG_THREAD_INFO')
@classmethod
def dev_id_help(cls) -> str:
''' Get the ArgParse help text for the --dev-id option.'''
help = '''Device identifier. Use it to select
which debugger, device, node or instance to
target when multiple ones are available or
connected.'''
addendum = '''\nThis option can be present multiple times.''' if \
cls.capabilities().mult_dev_ids else ''
return help + addendum
@classmethod
def extload_help(cls) -> str:
''' Get the ArgParse help text for the --extload option.'''
return '''External loader to be used by stm32cubeprogrammer
to program the targeted external memory.
The runner requires the external loader (*.stldr) filename.
This external loader (*.stldr) must be located within
STM32CubeProgrammer/bin/ExternalLoader directory.'''
@classmethod
def tool_opt_help(cls) -> str:
''' Get the ArgParse help text for the --tool-opt option.'''
return '''Option to pass on to the underlying tool used
by this runner. This can be given multiple times;
the resulting arguments will be given to the tool
in the order they appear on the command line.'''
@staticmethod
def require(program: str, path: str | None = None) -> str:
'''Require that a program is installed before proceeding.
:param program: name of the program that is required,
or path to a program binary.
:param path: PATH where to search for the program binary.
By default check on the system PATH.
If ``program`` is an absolute path to an existing program
binary, this call succeeds. Otherwise, try to find the program
by name on the system PATH or in the given PATH, if provided.
If the program can be found, its path is returned.
Otherwise, raises MissingProgram.'''
ret = shutil.which(program, path=path)
if ret is None:
raise MissingProgram(program)
return ret
def get_rtt_address(self) -> int | None:
'''Helper method for extracting a the RTT control block address.
If args.rtt_address was supplied, returns that.
Otherwise, attempt to locate an rtt block in the elf file.
If this is not found, None is returned'''
if self.cfg.rtt_address is not None:
return self.cfg.rtt_address
elif self.cfg.elf_file is not None:
return find_rtt_block(self.cfg.elf_file)
return None
def run_server_and_client(self, server, client, **kwargs):
'''Run a server that ignores SIGINT, and a client that handles it.
This routine portably:
- creates a Popen object for the ``server`` command which ignores
SIGINT
- runs ``client`` in a subprocess while temporarily ignoring SIGINT
- cleans up the server after the client exits.
- the keyword arguments, if any, will be passed down to both server and
client subprocess calls
It's useful to e.g. open a GDB server and client.'''
server_proc = self.popen_ignore_int(server, **kwargs)
try:
self.run_client(client, **kwargs)
finally:
server_proc.terminate()
server_proc.wait()
def run_client(self, client, **kwargs):
'''Run a client that handles SIGINT.'''
previous = signal.signal(signal.SIGINT, signal.SIG_IGN)
try:
self.check_call(client, **kwargs)
finally:
signal.signal(signal.SIGINT, previous)
def _log_cmd(self, cmd: list[str]):
escaped = ' '.join(shlex.quote(s) for s in cmd)
if not _DRY_RUN:
self.logger.debug(escaped)
else:
self.logger.info(escaped)
def call(self, cmd: list[str], **kwargs) -> int:
'''Subclass subprocess.call() wrapper.
Subclasses should use this method to run command in a
subprocess and get its return code, rather than
using subprocess directly, to keep accurate debug logs.
'''
self._log_cmd(cmd)
if _DRY_RUN:
return 0
return subprocess.call(cmd, **kwargs)
def check_call(self, cmd: list[str], **kwargs):
'''Subclass subprocess.check_call() wrapper.
Subclasses should use this method to run command in a
subprocess and check that it executed correctly, rather than
using subprocess directly, to keep accurate debug logs.
'''
self._log_cmd(cmd)
if _DRY_RUN:
return
subprocess.check_call(cmd, **kwargs)
def check_output(self, cmd: list[str], **kwargs) -> bytes:
'''Subclass subprocess.check_output() wrapper.
Subclasses should use this method to run command in a
subprocess and check that it executed correctly, rather than
using subprocess directly, to keep accurate debug logs.
'''
self._log_cmd(cmd)
if _DRY_RUN:
return b''
return subprocess.check_output(cmd, **kwargs)
def popen_ignore_int(self, cmd: list[str], **kwargs) -> subprocess.Popen:
'''Spawn a child command, ensuring it ignores SIGINT.
The returned subprocess.Popen object must be manually terminated.'''
cflags = 0
preexec = None
system = platform.system()
if system == 'Windows':
# We can't type check this line on Unix operating systems:
# mypy thinks the subprocess module has no such attribute.
cflags |= subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore
elif system in {'Linux', 'Darwin'}:
# We can't type check this on Windows for the same reason.
preexec = os.setsid # type: ignore
self._log_cmd(cmd)
if _DRY_RUN:
return _DebugDummyPopen() # type: ignore
return subprocess.Popen(cmd, creationflags=cflags, preexec_fn=preexec, **kwargs)
def ensure_output(self, output_type: str) -> None:
'''Ensure self.cfg has a particular output artifact.
For example, ensure_output('bin') ensures that self.cfg.bin_file
refers to an existing file. Errors out if it's missing or undefined.
:param output_type: string naming the output type
'''
output_file = getattr(self.cfg, f'{output_type}_file', None)
if output_file is None:
err = f'{output_type} file location is unknown.'
elif not os.path.isfile(output_file):
err = f'{output_file} does not exist.'
else:
return
if output_type in ('elf', 'hex', 'bin', 'uf2'):
err += f' Try enabling CONFIG_BUILD_OUTPUT_{output_type.upper()}.'
# RuntimeError avoids a stack trace saved in run_common.
raise RuntimeError(err)
def run_telnet_client(self, host: str, port: int, active_sock=None) -> None:
'''
Run a telnet client for user interaction.
'''
# If the caller passed in an active socket, use that
if active_sock is not None:
sock = active_sock
elif shutil.which('nc') is not None:
# If a `nc` command is available, run it, as it will provide the
# best support for CONFIG_SHELL_VT100_COMMANDS etc.
client_cmd = ['nc', host, str(port)]
# Note: netcat (nc) does not handle sigint, so cannot use run_client()
self.check_call(client_cmd)
return
else:
# Start a new socket connection
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
# Otherwise, use a pure python implementation. This will work well for logging,
# but input is line based only.
sel = selectors.DefaultSelector()
sel.register(sys.stdin, selectors.EVENT_READ)
sel.register(sock, selectors.EVENT_READ)
while True:
events = sel.select()
for key, _ in events:
if key.fileobj == sys.stdin:
text = sys.stdin.readline()
if text:
sock.send(text.encode())
elif key.fileobj == sock:
resp = sock.recv(2048)
if resp:
print(resp.decode())