25
25
from typing_extensions import TYPE_CHECKING
26
26
27
27
import cwltool .workflow
28
+ from . import process
28
29
29
30
from .errors import WorkflowException
30
31
from .job import CommandLineJob , JobBase
57
58
58
59
59
60
def copy_job_order (
60
- job : Union [Process , JobsType ], job_order_object : CWLObjectType
61
+ job : Union [Process , JobsType ], job_order_object : CWLObjectType , process
61
62
) -> CWLObjectType :
62
63
"""Create copy of job object for provenance."""
63
64
if not isinstance (job , WorkflowJob ):
@@ -66,12 +67,34 @@ def copy_job_order(
66
67
customised_job : CWLObjectType = {}
67
68
# new job object for RO
68
69
debug = _logger .isEnabledFor (logging .DEBUG )
70
+ # Process the process object first
71
+ load_listing = {}
72
+
73
+ # Implementation to capture the loadlisting from cwl to skip the inclusion of for example files of big database
74
+ # folders
75
+ for index , entry in enumerate (process .inputs_record_schema ["fields" ]):
76
+ if (
77
+ entry ["type" ] == "org.w3id.cwl.cwl.Directory"
78
+ and "loadListing" in entry
79
+ and entry ["loadListing" ]
80
+ ):
81
+ load_listing [entry ["name" ]] = entry ["loadListing" ]
82
+
83
+ # print("LOAD LISTING: ", load_listing)
84
+ # PROCESS:Workflow: file:///Users/jasperk/gitlab/cwltool/tests/wf/directory_no_listing.cwl
85
+ # print("PROCESS:" + str(process))
86
+
69
87
for each , i in enumerate (job .tool ["inputs" ]):
70
88
with SourceLine (job .tool ["inputs" ], each , WorkflowException , debug ):
71
89
iid = shortname (i ["id" ])
90
+ # if iid in the load listing object and no_listing then....
72
91
if iid in job_order_object :
73
- customised_job [iid ] = copy .deepcopy (job_order_object [iid ])
74
- # add the input element in dictionary for provenance
92
+ if iid in load_listing and load_listing [iid ] != "no_listing" :
93
+ customised_job [iid ] = copy .deepcopy (job_order_object [iid ])
94
+ # TODO Other listing options here?
95
+ else :
96
+ # add the input element in dictionary for provenance
97
+ customised_job [iid ] = copy .deepcopy (job_order_object [iid ])
75
98
elif "default" in i :
76
99
customised_job [iid ] = copy .deepcopy (i ["default" ])
77
100
# add the default elements in the dictionary for provenance
@@ -246,13 +269,13 @@ def evaluate(
246
269
if not hasattr (process , "steps" ):
247
270
# record provenance of independent commandline tool executions
248
271
self .prospective_prov (job )
249
- customised_job = copy_job_order (job , job_order_object )
272
+ customised_job = copy_job_order (job , job_order_object , process )
250
273
self .used_artefacts (customised_job , self .workflow_run_uri )
251
274
research_obj .create_job (customised_job )
252
275
elif hasattr (job , "workflow" ):
253
276
# record provenance of workflow executions
254
277
self .prospective_prov (job )
255
- customised_job = copy_job_order (job , job_order_object )
278
+ customised_job = copy_job_order (job , job_order_object , process )
256
279
self .used_artefacts (customised_job , self .workflow_run_uri )
257
280
258
281
def record_process_start (
@@ -472,8 +495,11 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
472
495
# a later call to this method will sort that
473
496
is_empty = True
474
497
498
+ # if value['basename'] == "dirIgnore":
499
+ # pass
475
500
if "listing" not in value :
476
501
get_listing (self .fsaccess , value )
502
+
477
503
for entry in cast (MutableSequence [CWLObjectType ], value .get ("listing" , [])):
478
504
is_empty = False
479
505
# Declare child-artifacts
0 commit comments