14
14
Zurich.
15
15
"""
16
16
import asyncio
17
+ import logging
17
18
from functools import partial
18
19
from functools import wraps
19
20
from typing import Callable
20
21
21
- import logging
22
-
23
22
import parsl
24
23
from parsl .addresses import address_by_hostname
25
24
from parsl .channels import LocalChannel
@@ -81,28 +80,29 @@ def load_parsl_config(
81
80
raise NotImplementedError
82
81
83
82
if config == "local" :
84
-
85
83
# Define a single provider
86
- prov = LocalProvider (
84
+ prov_local = LocalProvider (
87
85
launcher = SingleNodeLauncher (debug = False ),
88
86
channel = LocalChannel (),
89
87
init_blocks = 1 ,
90
88
min_blocks = 0 ,
91
89
max_blocks = 4 ,
92
90
)
93
-
94
- # Define two identical (apart from the label) executors
95
- htex = HighThroughputExecutor (
96
- label = add_prefix (workflow_id = workflow_id , executor_label = "cpu-low" ),
97
- provider = prov ,
98
- address = address_by_hostname (),
99
- )
100
- htex_2 = HighThroughputExecutor (
101
- label = add_prefix (workflow_id = workflow_id , executor_label = "cpu-2" ),
102
- provider = prov ,
103
- address = address_by_hostname (),
104
- )
105
- executors = [htex , htex_2 ]
91
+ # Define executors
92
+ providers = [prov_local ] * 4
93
+ labels = ["cpu-low" , "cpu-mid" , "cpu-high" , "gpu" ]
94
+ executors = []
95
+ for provider , label in zip (providers , labels ):
96
+ executors .append (
97
+ HighThroughputExecutor (
98
+ label = add_prefix (
99
+ workflow_id = workflow_id , executor_label = label
100
+ ),
101
+ provider = provider ,
102
+ address = address_by_hostname (),
103
+ cpu_affinity = "block" ,
104
+ )
105
+ )
106
106
107
107
elif config == "pelkmanslab" :
108
108
@@ -154,14 +154,18 @@ def load_parsl_config(
154
154
providers = [prov_cpu_low , prov_cpu_mid , prov_cpu_high , prov_gpu ]
155
155
labels = ["cpu-low" , "cpu-mid" , "cpu-high" , "gpu" ]
156
156
# FIXME
157
- list_mem_per_worker = [7 , 15 , 63 , 63 ] # FIXME
157
+ list_mem_per_worker = [7 , 15 , 63 , 63 ] # FIXME
158
158
executors = []
159
159
for provider , label in zip (providers , labels ):
160
160
executors .append (
161
161
HighThroughputExecutor (
162
- label = add_prefix (workflow_id = workflow_id , executor_label = label ),
162
+ label = add_prefix (
163
+ workflow_id = workflow_id , executor_label = label
164
+ ),
163
165
provider = provider ,
164
- mem_per_worker = list_mem_per_worker [labels .index (label )], # FIXME
166
+ mem_per_worker = list_mem_per_worker [
167
+ labels .index (label )
168
+ ], # FIXME
165
169
max_workers = 100 ,
166
170
address = address_by_hostname (),
167
171
cpu_affinity = "block" ,
@@ -188,7 +192,9 @@ def load_parsl_config(
188
192
189
193
# Define executors
190
194
htex_slurm_cpu = HighThroughputExecutor (
191
- label = add_prefix (workflow_id = workflow_id , executor_label = "cpu-low" ),
195
+ label = add_prefix (
196
+ workflow_id = workflow_id , executor_label = "cpu-low"
197
+ ),
192
198
provider = prov_slurm_cpu ,
193
199
address = address_by_hostname (),
194
200
cpu_affinity = "block" ,
@@ -253,7 +259,11 @@ def load_parsl_config(
253
259
)
254
260
255
261
256
- def shutdown_executors (* , workflow_id : str ):
262
+ def shutdown_executors (* , workflow_id : str , logger : logging .Logger = None ):
263
+
264
+ if logger is None :
265
+ logger = logging .getLogger ("logs" )
266
+
257
267
# Remove executors from parsl DFK
258
268
# FIXME decorate with monitoring logs, as in:
259
269
# https://github.com/Parsl/parsl/blob/master/parsl/dataflow/dflow.py#L1106
0 commit comments