61
61
from pytorch_lightning .utilities import (
62
62
_APEX_AVAILABLE ,
63
63
_HOROVOD_AVAILABLE ,
64
+ _IPU_AVAILABLE ,
64
65
_NATIVE_AMP_AVAILABLE ,
66
+ _TPU_AVAILABLE ,
65
67
AMPType ,
66
68
device_parser ,
67
69
DeviceType ,
@@ -101,6 +103,7 @@ def __init__(
101
103
# initialization
102
104
self ._device_type = DeviceType .CPU
103
105
self ._distrib_type = None
106
+ self ._accelerator_type = None
104
107
105
108
self .num_processes = num_processes
106
109
# `gpus` is the input passed to the Trainer, whereas `gpu_ids` is a list of parsed gpu ids.
@@ -133,16 +136,19 @@ def __init__(
133
136
134
137
self .plugins = plugins
135
138
139
+ self .select_accelerator_type ()
136
140
self .set_distributed_mode ()
137
141
self .configure_slurm_ddp ()
138
142
139
143
self .handle_given_plugins ()
144
+ self .update_device_type_if_ipu_plugin ()
145
+ self .validate_accelerator_type ()
140
146
141
147
self ._training_type_plugin_resolved = False
142
148
self .accelerator = self .select_accelerator ()
143
149
144
150
# override dist backend when using tpus
145
- if self .on_tpu :
151
+ if self .use_tpu :
146
152
self .distributed_backend = "tpu"
147
153
148
154
# init flags for SLURM+DDP to work
@@ -164,6 +170,45 @@ def __init__(
164
170
165
171
self .replace_sampler_ddp = replace_sampler_ddp
166
172
173
+ def select_accelerator_type (self ) -> None :
174
+ if self .distributed_backend == "auto" :
175
+ if self .has_tpu :
176
+ self ._accelerator_type = DeviceType .TPU
177
+ elif self .has_ipu :
178
+ self ._accelerator_type = DeviceType .IPU
179
+ elif self .has_gpu :
180
+ self ._accelerator_type = DeviceType .GPU
181
+ else :
182
+ self ._accelerator_type = DeviceType .CPU
183
+ elif self .distributed_backend == DeviceType .TPU :
184
+ if not self .has_tpu :
185
+ msg = "TPUs are not available" if not _TPU_AVAILABLE else "you didn't pass `tpu_cores` to `Trainer`"
186
+ raise MisconfigurationException (f"You passed `accelerator='tpu'`, but { msg } ." )
187
+ self ._accelerator_type = DeviceType .TPU
188
+ elif self .distributed_backend == DeviceType .IPU :
189
+ if not self .has_ipu :
190
+ msg = "IPUs are not available" if not _IPU_AVAILABLE else "you didn't pass `ipus` to `Trainer`"
191
+ raise MisconfigurationException (f"You passed `accelerator='ipu'`, but { msg } ." )
192
+ self ._accelerator_type = DeviceType .IPU
193
+ elif self .distributed_backend == DeviceType .GPU :
194
+ if not self .has_gpu :
195
+ msg = ("you didn't pass `gpus` to `Trainer`" if torch .cuda .is_available () else "GPUs are not available" )
196
+ raise MisconfigurationException (f"You passed `accelerator='gpu'`, but { msg } ." )
197
+ self ._accelerator_type = DeviceType .GPU
198
+ elif self .distributed_backend == DeviceType .CPU :
199
+ self ._accelerator_type = DeviceType .CPU
200
+
201
+ if self .distributed_backend in ["auto" ] + list (DeviceType ):
202
+ self .distributed_backend = None
203
+
204
+ def validate_accelerator_type (self ) -> None :
205
+ if self ._accelerator_type and self ._accelerator_type != self ._device_type :
206
+ raise MisconfigurationException (
207
+ f"Mismatch between the requested accelerator type ({ self ._accelerator_type } )"
208
+ f" and assigned device type ({ self ._device_type } )."
209
+ )
210
+ self ._accelerator_type = self ._device_type
211
+
167
212
def handle_given_plugins (self ) -> None :
168
213
169
214
training_type = None
@@ -245,28 +290,49 @@ def cluster_environment(self) -> ClusterEnvironment:
245
290
return self ._cluster_environment
246
291
247
292
@property
248
- def on_cpu (self ) -> bool :
249
- return self ._device_type == DeviceType .CPU
293
+ def has_cpu (self ) -> bool :
294
+ return True
295
+
296
+ @property
297
+ def use_cpu (self ) -> bool :
298
+ return self ._accelerator_type == DeviceType .CPU
299
+
300
+ @property
301
+ def has_gpu (self ) -> bool :
302
+ # Here, we are not checking for GPU availability, but instead if User has passed
303
+ # `gpus` to Trainer for training.
304
+ gpus = self .parallel_device_ids
305
+ return gpus is not None and len (gpus ) > 0
306
+
307
+ @property
308
+ def use_gpu (self ) -> bool :
309
+ return self ._accelerator_type == DeviceType .GPU and self .has_gpu
250
310
251
311
@property
252
- def on_tpu (self ) -> bool :
312
+ def has_tpu (self ) -> bool :
313
+ # Here, we are not checking for TPU availability, but instead if User has passed
314
+ # `tpu_cores` to Trainer for training.
253
315
return self .tpu_cores is not None
254
316
255
317
@property
256
- def on_ipu (self ) -> bool :
257
- return self .ipus is not None or isinstance ( self ._training_type_plugin , IPUPlugin )
318
+ def use_tpu (self ) -> bool :
319
+ return self ._accelerator_type == DeviceType . TPU and self .has_tpu
258
320
259
321
@property
260
322
def tpu_id (self ) -> Optional [int ]:
261
- if self .on_tpu and isinstance (self .tpu_cores , list ):
323
+ if self .use_tpu and isinstance (self .tpu_cores , list ):
262
324
return self .tpu_cores [0 ]
263
-
264
325
return None
265
326
266
327
@property
267
- def on_gpu (self ) -> bool :
268
- gpus = self .parallel_device_ids
269
- return gpus is not None and len (gpus ) > 0 and torch .cuda .is_available ()
328
+ def has_ipu (self ) -> bool :
329
+ # Here, we are not checking for IPU availability, but instead if User has passed
330
+ # `ipus` to Trainer for training.
331
+ return self .ipus is not None or isinstance (self ._training_type_plugin , IPUPlugin )
332
+
333
+ @property
334
+ def use_ipu (self ) -> bool :
335
+ return self ._accelerator_type == DeviceType .IPU and self .has_ipu
270
336
271
337
@property
272
338
def use_dp (self ) -> bool :
@@ -308,10 +374,10 @@ def _is_fully_sharded_training_type(self) -> bool:
308
374
def is_distributed (self ) -> bool :
309
375
# Used for custom plugins.
310
376
# Custom plugins should implement is_distributed property.
311
- if hasattr (self .training_type_plugin , 'is_distributed' ) and not self .on_tpu :
377
+ if hasattr (self .training_type_plugin , 'is_distributed' ) and not self .use_tpu :
312
378
return self .training_type_plugin .is_distributed
313
379
is_distributed = self .use_ddp or self .use_ddp2 or self .use_horovod
314
- if self .on_tpu :
380
+ if self .use_tpu :
315
381
is_distributed |= self .training_type_plugin .is_distributed
316
382
return is_distributed
317
383
@@ -332,14 +398,14 @@ def num_ipus(self) -> int:
332
398
333
399
@property
334
400
def parallel_devices (self ) -> List [Union [torch .device , int ]]:
335
- if self .on_gpu :
401
+ if self .use_gpu :
336
402
devices = [torch .device ("cuda" , i ) for i in self .parallel_device_ids ]
337
- elif self .on_tpu :
403
+ elif self .use_tpu :
338
404
# explicitly don't make a tpu device here!
339
405
# https://github.com/PyTorchLightning/pytorch-lightning/issues/3169
340
406
if isinstance (self .tpu_cores , int ):
341
407
devices = list (range (self .tpu_cores ))
342
- elif self .on_ipu :
408
+ elif self .use_ipu :
343
409
devices = list (range (self .num_ipus ))
344
410
else :
345
411
devices = [torch .device ("cpu" )] * self .num_processes
@@ -373,7 +439,7 @@ def select_precision_plugin(self) -> PrecisionPlugin:
373
439
# set precision type
374
440
self .amp_type = AMPType .from_str (self .amp_type )
375
441
376
- if self .on_ipu :
442
+ if self .use_ipu :
377
443
return IPUPrecisionPlugin (self .precision )
378
444
379
445
if self ._distrib_type == DistributedType .DEEPSPEED or isinstance (self ._training_type_plugin , DeepSpeedPlugin ):
@@ -384,11 +450,11 @@ def select_precision_plugin(self) -> PrecisionPlugin:
384
450
if self .precision == 64 :
385
451
return DoublePrecisionPlugin ()
386
452
if self .precision == 16 :
387
- if self .on_tpu :
453
+ if self .use_tpu :
388
454
return TPUHalfPrecisionPlugin ()
389
455
390
456
if self .amp_type == AMPType .NATIVE :
391
- if self .on_cpu :
457
+ if self .use_cpu :
392
458
raise MisconfigurationException (
393
459
"You have asked for native AMP on CPU, but AMP is only available on GPU."
394
460
)
@@ -444,8 +510,8 @@ def select_training_type_plugin(self) -> TrainingTypePlugin:
444
510
use_torchelastic_ddp = self .use_ddp and TorchElasticEnvironment .is_using_torchelastic ()
445
511
use_kubeflow_ddp = self .use_ddp and KubeflowEnvironment .is_using_kubeflow ()
446
512
use_ddp_spawn = self ._distrib_type == DistributedType .DDP_SPAWN
447
- use_ddp_cpu_spawn = self .use_ddp and self .on_cpu
448
- use_tpu_spawn = self .on_tpu and self ._distrib_type == DistributedType .TPU_SPAWN
513
+ use_ddp_cpu_spawn = self .use_ddp and self .use_cpu
514
+ use_tpu_spawn = self .use_tpu and self ._distrib_type == DistributedType .TPU_SPAWN
449
515
use_ddp_cpu_torch_elastic = use_ddp_cpu_spawn and TorchElasticEnvironment .is_using_torchelastic ()
450
516
use_ddp_cpu_kubeflow = use_ddp_cpu_spawn and KubeflowEnvironment .is_using_kubeflow ()
451
517
use_ddp_cpu_slurm = use_ddp_cpu_spawn and self .is_slurm_managing_tasks
@@ -484,13 +550,13 @@ def select_training_type_plugin(self) -> TrainingTypePlugin:
484
550
plugin = DataParallelPlugin (parallel_devices = self .parallel_devices )
485
551
elif self .use_horovod :
486
552
plugin = HorovodPlugin (parallel_devices = self .parallel_devices )
487
- elif self .on_tpu and isinstance (self .tpu_cores , list ):
553
+ elif self .use_tpu and isinstance (self .tpu_cores , list ):
488
554
plugin = SingleTPUPlugin (self .tpu_id )
489
- elif self .on_ipu :
555
+ elif self .use_ipu :
490
556
plugin = IPUPlugin (parallel_devices = self .parallel_devices )
491
557
else :
492
558
single_gpu_ordinal = device_parser .determine_root_gpu_device (self .parallel_device_ids )
493
- plugin = SingleDevicePlugin (device = torch .device (f"cuda:{ single_gpu_ordinal } " if self .on_gpu else "cpu" ))
559
+ plugin = SingleDevicePlugin (device = torch .device (f"cuda:{ single_gpu_ordinal } " if self .use_gpu else "cpu" ))
494
560
return plugin
495
561
496
562
def resolve_training_type_plugin (self , training_type : TrainingTypePlugin ) -> TrainingTypePlugin :
@@ -526,11 +592,11 @@ def select_accelerator(self) -> Accelerator:
526
592
)
527
593
return self .distributed_backend
528
594
529
- if self .on_gpu :
595
+ if self .use_gpu :
530
596
acc_cls = GPUAccelerator
531
- elif self .on_tpu :
597
+ elif self .use_tpu :
532
598
acc_cls = TPUAccelerator
533
- elif self .on_ipu :
599
+ elif self .use_ipu :
534
600
acc_cls = IPUAccelerator
535
601
else :
536
602
acc_cls = CPUAccelerator
@@ -574,12 +640,15 @@ def set_distributed_mode(self, distributed_backend: Optional[str] = None):
574
640
if isinstance (self .distributed_backend , Accelerator ):
575
641
return
576
642
643
+ is_cpu_accelerator_type = self ._accelerator_type and self ._accelerator_type == DeviceType .CPU
644
+ _use_cpu = is_cpu_accelerator_type or self .distributed_backend and 'cpu' in self .distributed_backend
645
+
577
646
if self .distributed_backend is None :
578
647
if self .has_horovodrun ():
579
648
self ._set_horovod_backend ()
580
649
elif self .num_gpus == 0 and (self .num_nodes > 1 or self .num_processes > 1 ):
581
650
self ._distrib_type = DistributedType .DDP
582
- elif self .num_gpus > 1 :
651
+ elif self .num_gpus > 1 and not _use_cpu :
583
652
rank_zero_warn (
584
653
'You requested multiple GPUs but did not specify a backend, e.g.'
585
654
' `Trainer(accelerator="dp"|"ddp"|"ddp2")`. Setting `accelerator="ddp_spawn"` for you.'
@@ -598,23 +667,21 @@ def set_distributed_mode(self, distributed_backend: Optional[str] = None):
598
667
# define the max CPU available
599
668
self .num_processes = os .cpu_count ()
600
669
# special case with TPUs
601
- elif self .distributed_backend == 'tpu' or self . tpu_cores is not None :
670
+ elif self .has_tpu and not _use_cpu :
602
671
self ._device_type = DeviceType .TPU
603
672
if isinstance (self .tpu_cores , int ):
604
673
self ._distrib_type = DistributedType .TPU_SPAWN
605
- elif self .distributed_backend == 'ipu' :
674
+ elif self .has_ipu and not _use_cpu :
606
675
self ._device_type = DeviceType .IPU
607
676
elif self .distributed_backend and self ._distrib_type is None :
608
677
self ._distrib_type = DistributedType (self .distributed_backend )
609
678
610
- # unless you request explicitly for CPU and some GPU are available use them
611
- _on_cpu = self .distributed_backend and 'cpu' in self .distributed_backend
612
- if self .num_gpus > 0 and not _on_cpu :
679
+ if self .num_gpus > 0 and not _use_cpu :
613
680
self ._device_type = DeviceType .GPU
614
681
615
682
_gpu_distrib_types = (DistributedType .DP , DistributedType .DDP , DistributedType .DDP_SPAWN , DistributedType .DDP2 )
616
683
# DP and DDP2 cannot run without GPU
617
- if self .num_gpus == 0 and self ._distrib_type in _gpu_distrib_types and not _on_cpu :
684
+ if self .num_gpus == 0 and self ._distrib_type in _gpu_distrib_types and not _use_cpu :
618
685
rank_zero_warn (
619
686
'You requested distributed training on GPUs, but none is available, so we set backend to `ddp_cpu`.'
620
687
)
@@ -656,7 +723,7 @@ def _set_horovod_backend(self):
656
723
657
724
# Initialize Horovod to get rank / size info
658
725
hvd .init ()
659
- if self .on_gpu :
726
+ if self .has_gpu :
660
727
# Horovod assigns one local GPU per process
661
728
self .parallel_device_ids = list (range (hvd .local_size ()))
662
729
else :
@@ -694,6 +761,12 @@ def has_horovodrun() -> bool:
694
761
"""Returns True if running with `horovodrun` using Gloo or OpenMPI."""
695
762
return "OMPI_COMM_WORLD_RANK" in os .environ or "HOROVOD_RANK" in os .environ
696
763
764
+ def update_device_type_if_ipu_plugin (self ) -> None :
765
+ # This allows the poptorch.Options that are passed into the IPUPlugin to be the source of truth,
766
+ # which gives users the flexibility to not have to pass `ipus` flag directly to Trainer
767
+ if isinstance (self ._training_type_plugin , IPUPlugin ) and self ._device_type != DeviceType .IPU :
768
+ self ._device_type = DeviceType .IPU
769
+
697
770
def configure_slurm_ddp (self ):
698
771
# extract SLURM flag vars
699
772
# whenever we have the correct number of tasks, we let slurm manage processes
0 commit comments