-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathlauncher.py
806 lines (641 loc) · 34.6 KB
/
launcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
# Copyright 2021 Intel Corporation.
# Copyright 2021 Hugging Face Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function, unicode_literals
from getpass import getpass
from random import getrandbits
from binascii import hexlify
import sys
import platform
import subprocess
import os
from os.path import expanduser
import re
import glob
from argparse import ArgumentParser, REMAINDER
from argparse import RawTextHelpFormatter
import logging
import psutil
from utils import CPUinfo
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
LOGGER = logging.getLogger(__name__)
r"""
This is a script for launching PyTorch training and inference on Intel Xeon CPU with optimal configurations.
Now, single instance inference/training, multi-instance inference/training and distributed training
with oneCCL backend is enabled.
To get the peak performance on Intel Xeon CPU, the script optimizes the configuration of thread and memory
management. For thread management, the script configures thread affinity and the preload of Intel OMP library.
For memory management, it configures NUMA binding and preload optimized memory allocation library (e.g. tcmalloc, jemalloc).
**How to use this module:**
*** Single instance inference/training ***
1. Run single-instance inference or training on a single node with all CPU sockets.
::
>>> python -m intel_pytorch_extension.launch script.py args
2. Run single-instance inference or training on a single CPU socket.
::
>>> python -m intel_pytorch_extension.launch --socket_id 1 script.py args
*** Multi-instance inference ***
1. Multi-instance
By default, one instance per socket. if you want to set the instance numbers and core per instance, --nintances and --ncore_per_instance should be set.
>>> python -m intel_pytorch_extension.launch --multi_instance python_script args
eg: on CLX8280 with 14 instance, 4 cores per instance
::
>>> python -m intel_pytorch_extension.launch --multi_instance --nintances 14 --ncore_per_instance 4 python_script args
*** Distributed Training ***
spawns up multiple distributed training processes on each of the training nodes. For intel_pytorch_extension, oneCCL
is used as the communication backend and MPI used to launch multi-proc. To get the better
performance, you should specify the different cores for oneCCL communication and computation
process seperately. This tool can automatically set these ENVs(such as I_MPI_PIN_DOMIN) and launch
multi-proc for you.
The utility can be used for single-node distributed training, in which one or
more processes per node will be spawned. It can also be used in
multi-node distributed training, by spawning up multiple processes on each node
for well-improved multi-node distributed training performance as well.
1. Single-Node multi-process distributed training
::
>>> python -m intel_pytorch_extension.launch --distributed python_script --arg1 --arg2 --arg3 and all other
arguments of your training script
2. Multi-Node multi-process distributed training: (e.g. two nodes)
rank 0: *(IP: 192.168.10.10, and has a free port: 295000)*
::
>>> python -m intel_pytorch_extension.launch --distributed --nproc_per_node=xxx
--nnodes=2 --hostfile hostfile python_sript --arg1 --arg2 --arg3
and all other arguments of your training script)
3. To look up what optional arguments this module offers:
::
>>> python -m intel_pytorch_extension.launch --help
*** Memory allocator ***
"--enable_tcmalloc" and "--enable_jemalloc" can be used to enable different memory allcator.
"""
SUDOER_PASSWORD = None
THP_ALLOWED_VALUES = {'always', 'never', 'madvise'}
THP_COMMON_LOCATION = "/sys/kernel/mm/transparent_hugepage/enabled"
THP_REDHAT_LOCATION = "/sys/kernel/mm/redhat_transparent_hugepage/enabled"
THP_LOCATION = THP_REDHAT_LOCATION if os.path.exists(THP_REDHAT_LOCATION) else THP_COMMON_LOCATION
def get_transparent_huge_pages():
if os.path.exists(THP_LOCATION):
with open(THP_LOCATION) as f:
tbh_status = f.read().rstrip() # Remove newline
tbh_value = re.search("\\[(.*)\\]", tbh_status)
if tbh_value is not None and tbh_value.group(1) in THP_ALLOWED_VALUES:
return tbh_value.group(1)
return None
def set_transparent_huge_pages(tbh_value, elevation_pwd=None):
if not tbh_value or tbh_value not in THP_ALLOWED_VALUES:
print(f"Provided TBH value to be set is not valid {tbh_value}")
return
if os.path.exists(THP_LOCATION):
# Clear memory cache on kernel level
print("Clearing kernel memory cache: 'echo 3 > /proc/sys/vm/drop_caches'")
code = subprocess.call(f'echo {elevation_pwd} | sudo -S sh -c "sync;echo 3 > /proc/sys/vm/drop_caches"', shell=True)
if code != 0:
print(f"Unable to clear kernel memory cache, return code={code}")
# Explicitly ask for huge pages
print(f'Setting Transparent Huge Page to status: "echo {tbh_value} > {THP_LOCATION}"')
code = subprocess.call(f'echo {elevation_pwd} | sudo -S sh -c "echo {tbh_value} > {THP_LOCATION}"', shell=True)
if code != 0:
print(f"Unable to set kernel transparent huge pages, return code={code}")
else:
print("Warning: Unable to enable Transparent HugePages.")
def set_mpi_pin_domain(args):
"""
I_MPI_PIN_DOMAIN specify the cores used for every MPI process.
The first ccl_worker_count cores of every rank for ccl communication
and the other cores will be used to do computation.
For example: on CascadeLake 8280 CPU, 2 ranks on one node. ccl_worker_count=4
CCL_WORKER_COUNT=4
CCL_WORKER_AFFINITY="0,1,2,3,28,29,30,31"
I_MPI_PIN_DOMAIN=[0xffffff0, 0xffffff0000000]
"""
cpuinfo = CPUinfo()
ppn = args.nproc_per_node
total_cores = cpuinfo.physical_core_nums
if args.use_logical_core:
total_cores = cpuinfo.logical_core_nums
cores_per_rank = total_cores // ppn
pin_domain = "["
for proc in range(ppn):
domain_binary = 0
begin = proc * cores_per_rank + args.ccl_worker_count
end = proc * cores_per_rank + cores_per_rank - 1
for i in range(begin, end + 1):
domain_binary |= (1 << i)
pin_domain += hex(domain_binary) + ","
return pin_domain + "]"
def set_ccl_worker_affinity(args):
"""
computation and communication use different cores when using oneCCL
backend for distributed training. we use first ccl_worker_count cores of
every rank for ccl communication
"""
cpuinfo = CPUinfo()
ppn = args.nproc_per_node
total_cores = cpuinfo.physical_core_nums
if args.use_logical_core:
total_cores = cpuinfo.logical_core_nums
cores_per_rank = total_cores // ppn
affinity = ''
for proc in range(ppn):
for ccl_worker in range(args.ccl_worker_count):
affinity += str(proc * cores_per_rank + ccl_worker) + ","
os.environ["CCL_WORKER_AFFINITY"] = affinity
def add_lib_preload(lib_type=None):
"""
Enable TCMalloc/JeMalloc/iomp
"""
library_paths = []
# We export path library through $<LIB_NAME>_LIBRARY_PATH
if f"{lib_type.upper()}_LIBRARY_PATH" in os.environ:
library_paths.append(os.environ[f"{lib_type.upper()}_LIBRARY_PATH"])
if "CONDA_PREFIX" in os.environ:
library_paths.append(os.environ["CONDA_PREFIX"] + "/lib/")
library_paths += [
f"{expanduser('~')}/.local/lib/",
"/usr/local/lib/",
"/usr/local/lib64/",
"/usr/lib/",
"/usr/lib64/"
]
lib_find = False
for lib_path in library_paths:
if not lib_path.endswith("/"):
lib_path += "/"
library_file = lib_path + "lib" + lib_type + ".so"
matches = glob.glob(library_file)
if len(matches) > 0:
if "LD_PRELOAD" in os.environ:
os.environ["LD_PRELOAD"] = matches[0] + ":" + os.environ["LD_PRELOAD"]
else:
os.environ["LD_PRELOAD"] = matches[0]
print(f"{lib_type} found at: {matches}")
lib_find = True
break
return lib_find
def set_memory_allocator(args):
if args.enable_tcmalloc and args.enable_jemalloc:
LOGGER.error("Unable to enable TCMalloc and JEMalloc at the same time")
exit(-1)
if args.enable_tcmalloc:
find_tc = add_lib_preload(lib_type="tcmalloc")
if not find_tc:
LOGGER.warning(
"Unable to find the {} library file lib{}.so in $CONDA_PREFIX/lib or /.local/lib/"
" or /usr/local/lib/ or /usr/local/lib64/ or /usr/lib or /usr/lib64 or "
"~/.local/lib/ so the LD_PRELOAD environment variable will not be set."
.format("TCmalloc", "tcmalloc", expanduser("~"))
)
args.additional_benchmark_args.append("+malloc=std")
else:
LOGGER.info("Use TCMalloc memory allocator")
args.additional_benchmark_args.append("+malloc=tcmalloc")
elif args.enable_jemalloc:
find_je = add_lib_preload(lib_type="jemalloc")
if not find_je:
LOGGER.warning(
"Unable to find the {} library file lib{}.so in $CONDA_PREFIX/lib or /.local/lib/"
" or /usr/local/lib/ or /usr/local/lib64/ or /usr/lib or /usr/lib64 or "
"~/.local/lib/ so the LD_PRELOAD environment variable will not be set."
.format("JeMalloc", "jemalloc", expanduser("~"))
)
args.additional_benchmark_args.append("+malloc=std")
else:
LOGGER.info("Use JeMalloc memory allocator")
args.additional_benchmark_args.append("+malloc=jemalloc")
if "MALLOC_CONF" not in os.environ:
os.environ["MALLOC_CONF"] = args.malloc_conf
LOGGER.info("MALLOC_CONF={}".format(os.environ["MALLOC_CONF"]))
elif args.use_default_allocator:
args.additional_benchmark_args.append("+malloc=std")
else:
find_tc = add_lib_preload(lib_type="tcmalloc")
if find_tc:
LOGGER.info("Use TCMalloc memory allocator")
args.additional_benchmark_args.append("+malloc=tcmalloc")
if "MALLOC_CONF" not in os.environ:
os.environ["MALLOC_CONF"] = args.malloc_conf
LOGGER.info("MALLOC_CONF={}".format(os.environ["MALLOC_CONF"]))
return
find_je = add_lib_preload(lib_type="jemalloc")
if find_je:
LOGGER.info("Use JeMalloc memory allocator")
args.additional_benchmark_args.append("+malloc=jemalloc")
if "MALLOC_CONF" not in os.environ:
os.environ["MALLOC_CONF"] = args.malloc_conf
LOGGER.info("MALLOC_CONF={}".format(os.environ["MALLOC_CONF"]))
return
LOGGER.warning(
"Both TCMalloc and JeMalloc are not fount in $CONDA_PREFIX/lib or /.local/lib/"
" or /usr/local/lib/ or /usr/local/lib64/ or /usr/lib or /usr/lib64 or "
"~/.local/lib/ so the LD_PRELOAD environment variable will not be set. "
"This may drop the performance"
.format(expanduser("~"))
)
args.additional_benchmark_args.append(f"+malloc=std")
def set_multi_thread_and_allocator(args):
set_memory_allocator(args)
if args.enable_thp:
SUDOER_PASSWORD = getpass("Setting Transparent Huge Page requires elevated privileges.\nPassword:")
set_transparent_huge_pages("always", SUDOER_PASSWORD)
if "THP_STATUS" not in os.environ:
os.environ["THP_STATUS"] = get_transparent_huge_pages()
if "OMP_NUM_THREADS" not in os.environ:
os.environ["OMP_NUM_THREADS"] = str(args.ncore_per_instance)
elif "OMP_NUM_THREADS" in os.environ:
args.ncore_per_instance = int(os.environ["OMP_NUM_THREADS"])
if "OMP_MAX_ACTIVE_LEVELS" not in os.environ:
os.environ["OMP_MAX_ACTIVE_LEVELS"] = str(args.omp_max_active_levels)
else:
args.omp_max_active_levels = int(os.environ["OMP_MAX_ACTIVE_LEVELS"])
if "KMP_AFFINITY" not in os.environ:
os.environ["KMP_AFFINITY"] = args.kmp_affinity
if "KMP_BLOCKTIME" not in os.environ:
os.environ["KMP_BLOCKTIME"] = args.kmp_blocktime
if "DNNL_PRIMITIVE_CACHE_CAPACITY" not in os.environ:
os.environ["DNNL_PRIMITIVE_CACHE_CAPACITY"] = '1024'
LOGGER.info(f"OMP_NUM_THREADS={os.environ['OMP_NUM_THREADS']}")
LOGGER.info(f"OMP_MAX_ACTIVE_LEVELS={os.environ['OMP_MAX_ACTIVE_LEVELS']}")
LOGGER.info(f"KMP_AFFINITY={os.environ['KMP_AFFINITY']}")
LOGGER.info(f"KMP_BLOCKTIME={os.environ['KMP_BLOCKTIME']}")
LOGGER.info(f"DNNL_PRIMITIVE_CACHE_CAPACITY={os.environ['DNNL_PRIMITIVE_CACHE_CAPACITY']}")
omp_backend = "default"
if args.enable_iomp:
find_iomp = add_lib_preload(lib_type="iomp5")
if not find_iomp:
LOGGER.warning("Unable to find the {} library file lib{}.so in $CONDA_PREFIX/lib or /.local/lib/"
" or /usr/local/lib/ or /usr/local/lib64/ or /usr/lib or /usr/lib64 or "
"~/.local/lib/ so the LD_PRELOAD environment variable will not be set."
.format("iomp", "iomp", expanduser("~")))
else:
omp_backend = "iomp"
# Add any additional argument for benchmark script
args.additional_benchmark_args.append(f"backend.num_threads={os.environ['OMP_NUM_THREADS']}")
args.additional_benchmark_args.append(f"+openmp.backend={omp_backend}")
args.additional_benchmark_args.append(f"+openmp.num_threads={os.environ['OMP_NUM_THREADS']}")
args.additional_benchmark_args.append(f"+openmp.max_active_levels={os.environ['OMP_MAX_ACTIVE_LEVELS']}")
args.additional_benchmark_args.append(f'+openmp.affinity="{os.environ["KMP_AFFINITY"]}"')
args.additional_benchmark_args.append(f"+openmp.blocktime={os.environ['KMP_BLOCKTIME']}")
args.additional_benchmark_args.append(f"use_huge_page={os.environ['THP_STATUS']}")
def launch(args):
"""
single-instance / multi-instance launcher
"""
cores, processes = [], []
cpuinfo = CPUinfo()
if args.core_list: # user specify what cores will be used by params
cores = args.core_list.strip().split(",")
if args.ncore_per_instance == -1:
LOGGER.error("please specify the '--ncore_per_instance' if you have pass the --core_list params")
exit(-1)
elif args.ninstances > 1 and args.ncore_per_instance * args.ninstances < len(cores):
LOGGER.warning("only first {} cores will be used, but you specify {} cores in core_list".format
(args.ncore_per_instance * args.ninstances, len(cores)))
else:
args.ninstances = len(cores) // args.ncore_per_instance
else:
if args.use_logical_core:
if args.socket_id != -1:
cores = cpuinfo.get_socket_logical_cores(args.socket_id)
else:
cores = cpuinfo.get_all_logical_cores
else:
if args.socket_id != -1:
cores = cpuinfo.get_socket_physical_cores(args.socket_id)
else:
cores = cpuinfo.get_all_physical_cores
if not args.multi_instance and args.ninstances == -1 and args.ncore_per_instance == -1:
args.ninstances = 1
args.ncore_per_instance = len(cores)
elif args.multi_instance and args.ninstances == -1 and args.ncore_per_instance == -1:
args.throughput_performance = True
elif args.ncore_per_instance == -1 and args.ninstances != -1:
args.ncore_per_instance = len(cores) // args.ninstances
elif args.ncore_per_instance != -1 and args.ninstances == -1:
args.ninstances = len(cores) // args.ncore_per_instance
else:
if args.ninstances * args.ncore_per_instance > len(cores):
LOGGER.error("Please make sure ninstances * ncore_per_instance <= total_cores")
exit(-1)
if args.latency_performance:
if args.ncore_per_instance != 4:
LOGGER.warning("latency_performance is a special mode, args.ncore_per_instance can only be set to be 4")
args.ncore_per_instance = 4
cores = cpuinfo.get_all_physical_cores
args.ninstances = len(cores) // args.ncore_per_instance
if args.throughput_performance:
args.ninstances = cpuinfo.socket_nums
cores = cpuinfo.get_all_physical_cores
args.ncore_per_instance = len(cores) // args.ninstances
os.environ["LAUNCH_CMD"] = "#"
os.environ["LAUNCH_THP"] = get_transparent_huge_pages()
os.environ["EXPERIMENT_ID"] = hexlify(getrandbits(32).to_bytes(4, 'big')).decode('ascii')
set_multi_thread_and_allocator(args)
args.additional_benchmark_args.append(f"num_instances={args.ninstances}")
args.additional_benchmark_args.append(f"num_core_per_instance={args.ncore_per_instance}")
args.additional_benchmark_args.append(f"experiment_id={os.environ['EXPERIMENT_ID']}")
for i in range(args.ninstances):
cmd, instance_specific_args = [], []
instance_specific_args.append(f"instance_id={i}")
if not args.disable_numactl:
instance_cores = cores[i * args.ncore_per_instance:(i + 1) * args.ncore_per_instance]
instance_sockets = cpuinfo.get_sockets_for_cores(instance_cores)
# Convert to numactl string argument
instance_cores_str = ",".join(instance_cores)
instance_sockets_str = ",".join(instance_sockets)
# Generate numactl call
cmd = ["numactl"]
numa_params = "-C {} ".format(instance_cores_str)
numa_params += "-m {}".format(instance_sockets_str)
cmd.extend(numa_params.split())
instance_specific_args.append(f"+numactl.enabled=true")
instance_specific_args.append(f"+numactl.cores=\"{instance_cores_str}\"")
instance_specific_args.append(f"+numactl.membind=\"{instance_sockets_str}\"")
else:
instance_specific_args.append(f"+numactl.enabled=false")
with_python = not args.no_python
if with_python:
cmd.append(sys.executable)
if args.module:
cmd.append("-m")
if "LD_PRELOAD" in os.environ:
instance_specific_args.append("+ld_preload=\"" + os.environ["LD_PRELOAD"] + "\"")
else:
instance_specific_args.append("+ld_preload=\"\"")
cmd.append(args.program)
cmd.extend(args.program_args)
cmd.extend(args.additional_benchmark_args)
cmd.extend(instance_specific_args)
os.environ["LAUNCH_CMD"] += " ".join(cmd) + ",#"
process = subprocess.Popen(cmd, env=os.environ)
processes.append(process)
os.environ["LAUNCH_CMD"] = os.environ["LAUNCH_CMD"][:-2]
for process in processes:
process.wait()
if process.returncode != 0:
raise subprocess.CalledProcessError(returncode=process.returncode, cmd=cmd)
if args.enable_thp:
# reset to existing val
set_transparent_huge_pages(os.environ["LAUNCH_THP"], SUDOER_PASSWORD)
print(f"Experiment results saved at: {os.path.join('outputs', os.environ['EXPERIMENT_ID'])}")
def mpi_dist_launch(args):
"""
Set ENVs and launch MPI process for distributed training.
"""
if args.nnodes > 1 and not os.path.exists(args.hostfile):
raise ValueError("hostfile is necessary when you use multi-node distributed training,"
"Please create hostfile which include the ip list you used for distributed running")
elif args.nnodes > 1:
ipv4_addr_pattern = r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
ip_list = []
with open(args.hostfile) as f:
for line in f:
line = line.strip().strip("\n")
is_valid = re.match(ipv4_addr_pattern, line)
if not is_valid:
LOGGER.error(f"{line} is not valid IPV4 address")
exit(-1)
else:
ip_list.append(line)
if len(ip_list) < args.nnodes:
LOGGER.error(f"The number of IP {len(ip_list)} should greater than nnodes parameters {args.nnodes}")
exit(-1)
master_check = False
dic = psutil.net_if_addrs()
for adapter in dic:
snicList = dic[adapter]
for snic in snicList:
if snic.address == ip_list[0]:
master_check = True
if not master_check:
LOGGER.error(
f"MASTER_ADDR is not right. Please make sure the first ip {ip_list[0]} "
f"in your hostfile is the current node"
)
exit(-1)
LOGGER.info("Begin to validate the ip connect")
args.master_addr = ip_list[0]
for ip in ip_list[1:]:
completed_process = subprocess.run("ssh -o PasswordAuthentication=no {} ':'".format(ip), shell=True)
if completed_process.returncode != 0:
LOGGER.error(
f"Password-less SSH login to {args.master_addr} failed, "
f"please make sure you have setup SSH public key right"
)
exit(-1)
else:
LOGGER.info("connection from master node {} to slave node {} is OK".format(args.master_addr, ip))
set_memory_allocator(args)
# set distributed related environmental variables
os.environ["MASTER_ADDR"] = args.master_addr
os.environ["MASTER_PORT"] = str(args.master_port)
if "I_MPI_PIN_DOMAIN" not in os.environ:
mpi_pin_domain = set_mpi_pin_domain(args)
else:
mpi_pin_domain = os.environ["I_MPI_PIN_DOMAIN"]
cpuinfo = CPUinfo()
ppn = args.nproc_per_node
total_cores = len(cpuinfo.get_all_physical_cores)
cores_per_rank = total_cores // ppn
if "OMP_NUM_THREADS" not in os.environ:
opm_num_threads = cores_per_rank - args.ccl_worker_count
else:
opm_num_threads = os.environ["OMP_NUM_THREADS"]
os.environ["CCL_WORKER_COUNT"] = str(args.ccl_worker_count)
if "CCL_WORKER_AFFINITY" not in os.environ:
set_ccl_worker_affinity(args)
if "CCL_ATL_TRANSPORT" not in os.environ:
os.environ["CCL_ATL_TRANSPORT"] = "ofi"
if args.enable_iomp:
find_iomp = add_lib_preload(lib_type="iomp5")
if not find_iomp:
LOGGER.warning("Unable to find the {} library file lib{}.so in $CONDA_PREFIX/lib or /.local/lib/"
" or /usr/local/lib/ or /usr/local/lib64/ or /usr/lib or /usr/lib64 or "
"~/.local/lib/ so the LD_PRELOAD environment variable will not be set."
.format("iomp", "iomp", expanduser("~")))
else:
LOGGER.info("Enable iomp by set LD_PRELOAD")
LOGGER.info("MASTER_ADDR={}".format(args.master_addr))
LOGGER.info("MASTER_PORT={}".format(args.master_port))
LOGGER.info("I_MPI_PIN_DOMAIN={}".format(mpi_pin_domain))
LOGGER.info("OMP_NUM_THREADS={} ".format(opm_num_threads))
LOGGER.info("CCL_WORKER_COUNT={}".format(args.ccl_worker_count))
LOGGER.info("CCL_WORKER_AFFINITY={}".format(os.environ["CCL_WORKER_AFFINITY"]))
os.environ["LAUNCH_CMD"] = "#"
cmd = ['mpiexec.hydra']
mpi_config = "-l -np {} -ppn {} -genv I_MPI_PIN_DOMAIN={} -genv OMP_NUM_THREADS={} ".format(
args.nnodes*args.nproc_per_node, args.nproc_per_node, mpi_pin_domain, opm_num_threads
)
mpi_config += args.more_mpi_parms
if args.nnodes > 1:
mpi_config += " -hostfile {}".format(args.hostfile)
cmd.extend(mpi_config.split())
with_python = not args.no_python
if with_python:
cmd.append(sys.executable)
cmd.append("-u")
if args.module:
cmd.append("-m")
cmd.append(args.program)
cmd.extend(args.program_args)
process = subprocess.Popen(cmd, env=os.environ)
process.wait()
os.environ["LAUNCH_CMD"] += " ".join(cmd) + ",#"
os.environ["LAUNCH_CMD"] = os.environ["LAUNCH_CMD"][:-2]
def add_distributed_training_params(parser):
cpuinfo = CPUinfo()
socket_nums = cpuinfo.socket_nums
group = parser.add_argument_group("Distributed Training Parameters With oneCCL backend")
group.add_argument("--nnodes", metavar='\b', type=int, default=1,
help="The number of nodes to use for distributed "
"training")
group.add_argument("--nproc_per_node", metavar='\b', type=int, default=socket_nums,
help="The number of processes to launch on each node")
# ccl control
group.add_argument("--ccl_worker_count", metavar='\b', default=4, type=int,
help="Core numbers per rank used for ccl communication")
# mpi control
group.add_argument("--master_addr", metavar='\b', default="127.0.0.1", type=str,
help="Master node (rank 0)'s address, should be either "
"the IP address or the hostname of node 0, for "
"single node multi-proc training, the "
"--master_addr can simply be 127.0.0.1")
group.add_argument("--master_port", metavar='\b', default=29500, type=int,
help="Master node (rank 0)'s free port that needs to "
"be used for communication during distributed "
"training")
group.add_argument("--hostfile", metavar='\b', default="hostfile", type=str,
help="Hostfile is necessary for multi-node multi-proc "
"training. hostfile includes the node address list "
"node address which should be either the IP address"
"or the hostname.")
group.add_argument("--more_mpi_parms", metavar='\b', default="", type=str,
help="User can pass more parameters for mpiexec.hydra "
"except for -np -ppn -hostfile and -genv I_MPI_PIN_DOMAIN")
def add_memory_allocator_params(parser):
group = parser.add_argument_group("Memory Allocator Parameters")
# allocator control
group.add_argument("--enable_tcmalloc", action='store_true', default=False,
help="Enable tcmalloc allocator")
group.add_argument("--enable_jemalloc", action='store_true', default=False,
help="Enable jemalloc allocator")
group.add_argument("--use_default_allocator", action='store_true', default=False,
help="Use default memory allocator")
group.add_argument("--malloc_conf", metavar='\b', default="oversize_threshold:1,background_thread:true,metadata_thp:auto,dirty_decay_ms:9000000000,muzzy_decay_ms:9000000000", type=str,
help="MALLOC_CONF setup, for jemalloc only, environment variable has higher priority than this args."
"default value is : oversize_threshold:1,background_thread:true,metadata_thp:auto,dirty_decay_ms:9000000000,muzzy_decay_ms:9000000000")
# transparent huge pages
group.add_argument("--enable_thp", action="store_true", default=False, help="Enable Transparent Huge Pages")
def add_multi_instance_params(parser):
group = parser.add_argument_group("Multi-instance Parameters")
# multi-instance control
group.add_argument("--ncore_per_instance", metavar='\b', default=-1, type=int,
help="Cores per instance")
group.add_argument("--ninstances", metavar='\b', default=-1, type=int,
help="For multi-instance, you should give the cores number you used for per insantance.")
group.add_argument("--latency_performance", action='store_true', default=False,
help="By detault 4 core per instance and use all physical cores")
group.add_argument("--throughput_performance", action='store_true', default=False,
help="By default one instance per socket and use all physical cores")
group.add_argument("--socket_id", metavar='\b', default=-1, type=int,
help="Socket id for multi-instance, by default all sockets will be used")
group.add_argument("--use_logical_core", action='store_true', default=False,
help="Whether only use physical cores")
group.add_argument("--disable_numactl", action='store_true', default=False,
help="Disable numactl")
group.add_argument("--core_list", metavar='\b', default=None, type=str,
help="Specify the core list as 'core_id, core_id, ....', otherwise, all the cores will be used.")
def add_kmp_iomp_params(parser):
group = parser.add_argument_group("KMP/IOMP Affinity Parameters")
group.add_argument("--kmp_affinity", metavar='\b', default="granularity=fine,compact,1,0", type=str,
help="KMP_AFFINITY setup, environment variable has higher priority than this args."
"default value is : granularity=fine,compact,1,0")
group.add_argument("--kmp_blocktime", metavar='\b', default="1", type=str,
help="KMP_BLOCKTIME setup, environment variable has higher priority than this args."
"default value is : 1")
group.add_argument("--omp_max_active_levels", type=int, default=1, help="Set OMP_MAX_ACTIVE_LEVELS env var.")
group.add_argument("--enable_iomp", action='store_true', default=False,
help="Enable iomp and libiomp5.so will be add to LD_PRELOAD")
def parse_system_info(args):
from platform import libc_ver, uname
uname_info = uname()
args.additional_benchmark_args.append(f"+system.name={uname_info.system}")
args.additional_benchmark_args.append(f"+system.arch={uname_info.machine}")
args.additional_benchmark_args.append(f"+system.kernel={uname_info.release}")
args.additional_benchmark_args.append(f"+system.libc={libc_ver()[-1]}")
def parse_args():
"""
Helper function parsing the command line options
@retval ArgumentParser
"""
parser = ArgumentParser(description="This is a script for launching PyTorch training and inference on Intel Xeon CPU "
"with optimal configurations. Now, single instance inference/training, multi-instance "
"inference/training and distributed training with oneCCL backend is enabled. "
"To get the peak performance on Intel Xeon CPU, the script optimizes the configuration "
"of thread and memory management. For thread management, the script configures thread "
"affinity and the preload of Intel OMP library. For memory management, it configures "
"NUMA binding and preload optimized memory allocation library (e.g. tcmalloc, jemalloc) "
"\n################################# Basic usage ############################# \n"
"\n 1. single instance\n"
"\n >>> python -m intel_pytorch_extension.launch python_script args \n"
"\n2. multi-instance \n"
"\n >>> python -m intel_pytorch_extension.launch --multi_instance python_script args\n"
"\n3. Single-Node multi-process distributed training\n"
"\n >>> python -m intel_pytorch_extension.launch --distributed python_script args\n"
"\n4. Multi-Node multi-process distributed training: (e.g. two nodes)\n"
"\n rank 0: *(IP: 192.168.10.10, and has a free port: 295000)*\n"
"\n >>> python -m intel_pytorch_extension.launch --distributed --nproc_per_node=2\n"
"\n --nnodes=2 --hostfile hostfile python_script args\n",
formatter_class=RawTextHelpFormatter)
parser.add_argument("--multi_instance", action='store_true', default=False,
help="Enable multi-instance, by default one instance per socket")
parser.add_argument('--distributed', action='store_true', default=False,
help='Enable distributed training.')
parser.add_argument("-m", "--module", default=False, action="store_true",
help="Changes each process to interpret the launch script "
"as a python module, executing with the same behavior as"
"'python -m'.")
parser.add_argument("--no_python", default=False, action="store_true",
help="Do not prepend the --program script with \"python\" - just exec "
"it directly. Useful when the script is not a Python script.")
add_memory_allocator_params(parser)
add_kmp_iomp_params(parser)
add_distributed_training_params(parser)
add_multi_instance_params(parser)
# positional
parser.add_argument("program", type=str,
help="The full path to the proram/script to be launched. "
"followed by all the arguments for the script")
# rest from the training program
parser.add_argument('program_args', nargs=REMAINDER)
return parser.parse_args()
def main():
env_before = set(os.environ.keys())
if platform.system() == "Windows":
raise RuntimeError("Windows platform is not supported!!!")
args = parse_args()
args.additional_benchmark_args = []
parse_system_info(args)
if args.distributed and args.multi_instance:
raise RuntimeError("Either args.distributed or args.multi_instance should be set")
if args.latency_performance and args.throughput_performance:
raise RuntimeError("Either args.latency_performance or args.throughput_performance should be set")
if args.nnodes > 1:
args.distributed = True
if args.distributed:
mpi_dist_launch(args)
else:
launch(args)
for x in sorted(set(os.environ.keys()) - env_before):
LOGGER.debug(f'{x}={os.environ[x]}')
if __name__ == "__main__":
main()