@@ -89,8 +89,15 @@ def copy_job_order(
89
89
iid = shortname (i ["id" ])
90
90
# if iid in the load listing object and no_listing then....
91
91
if iid in job_order_object :
92
- if iid in load_listing and load_listing [iid ] != "no_listing" :
93
- customised_job [iid ] = copy .deepcopy (job_order_object [iid ])
92
+ if iid in load_listing :
93
+ if load_listing [iid ] == "no_listing" :
94
+ _logger .warning ("Skip listing of " + iid )
95
+ job_order_object [iid ]['loadListing' ] = 'no_listing'
96
+ job_order_object [iid ]['listing' ] = []
97
+ customised_job [iid ] = job_order_object [iid ]
98
+ else :
99
+ # Normal deep copy
100
+ customised_job [iid ] = copy .deepcopy (job_order_object [iid ])
94
101
# TODO Other listing options here?
95
102
else :
96
103
# add the input element in dictionary for provenance
@@ -270,13 +277,14 @@ def evaluate(
270
277
# record provenance of independent commandline tool executions
271
278
self .prospective_prov (job )
272
279
customised_job = copy_job_order (job , job_order_object , process )
273
- self .used_artefacts (customised_job , self .workflow_run_uri )
280
+ self .used_artefacts (customised_job , self .workflow_run_uri , job . builder . loadListing )
274
281
research_obj .create_job (customised_job )
275
282
elif hasattr (job , "workflow" ):
276
283
# record provenance of workflow executions
277
284
self .prospective_prov (job )
278
285
customised_job = copy_job_order (job , job_order_object , process )
279
- self .used_artefacts (customised_job , self .workflow_run_uri )
286
+ self .used_artefacts (customised_job , self .workflow_run_uri , schema = process .inputs_record_schema )
287
+
280
288
281
289
def record_process_start (
282
290
self , process : Process , job : JobsType , process_run_id : Optional [str ] = None
@@ -355,11 +363,12 @@ def record_process_end(
355
363
process_run_id : str ,
356
364
outputs : Union [CWLObjectType , MutableSequence [CWLObjectType ], None ],
357
365
when : datetime .datetime ,
366
+ load_listing : str = "deep_listing" ,
358
367
) -> None :
359
- self .generate_output_prov (outputs , process_run_id , process_name )
368
+ self .generate_output_prov (outputs , process_run_id , process_name , load_listing )
360
369
self .document .wasEndedBy (process_run_id , None , self .workflow_run_uri , when )
361
370
362
- def declare_file (self , value : CWLObjectType ) -> Tuple [ProvEntity , ProvEntity , str ]:
371
+ def declare_file (self , value : CWLObjectType , load_listing : str = "deep_listing" ) -> Tuple [ProvEntity , ProvEntity , str ]:
363
372
if value ["class" ] != "File" :
364
373
raise ValueError ("Must have class:File: %s" % value )
365
374
# Need to determine file hash aka RO filename
@@ -436,9 +445,9 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st
436
445
):
437
446
# TODO: Record these in a specializationOf entity with UUID?
438
447
if sec ["class" ] == "File" :
439
- (sec_entity , _ , _ ) = self .declare_file (sec )
448
+ (sec_entity , _ , _ ) = self .declare_file (sec , load_listing )
440
449
elif sec ["class" ] == "Directory" :
441
- sec_entity = self .declare_directory (sec )
450
+ sec_entity = self .declare_directory (sec , load_listing )
442
451
else :
443
452
raise ValueError (f"Got unexpected secondaryFiles value: { sec } " )
444
453
# We don't know how/when/where the secondary file was generated,
@@ -453,7 +462,7 @@ def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, st
453
462
454
463
return file_entity , entity , checksum
455
464
456
- def declare_directory (self , value : CWLObjectType ) -> ProvEntity :
465
+ def declare_directory (self , value : CWLObjectType , load_listing : str = "deep_listing" ) -> ProvEntity :
457
466
"""Register any nested files/directories."""
458
467
# FIXME: Calculate a hash-like identifier for directory
459
468
# so we get same value if it's the same filenames/hashes
@@ -498,12 +507,19 @@ def declare_directory(self, value: CWLObjectType) -> ProvEntity:
498
507
# if value['basename'] == "dirIgnore":
499
508
# pass
500
509
if "listing" not in value :
501
- get_listing (self .fsaccess , value )
510
+ if load_listing == "no_listing" :
511
+ pass
512
+ elif load_listing == "deep_listing" :
513
+ get_listing (self .fsaccess , value )
514
+ elif load_listing == "shallow_listing" :
515
+ get_listing (self .fsaccess , value , False )
516
+ else :
517
+ raise ValueError ("Invalid listing value: %s" , load_listing )
502
518
503
519
for entry in cast (MutableSequence [CWLObjectType ], value .get ("listing" , [])):
504
520
is_empty = False
505
521
# Declare child-artifacts
506
- entity = self .declare_artefact (entry )
522
+ entity = self .declare_artefact (entry , load_listing )
507
523
self .document .membership (coll , entity )
508
524
# Membership relation aka our ORE Proxy
509
525
m_id = uuid .uuid4 ().urn
@@ -573,7 +589,7 @@ def declare_string(self, value: str) -> Tuple[ProvEntity, str]:
573
589
)
574
590
return entity , checksum
575
591
576
- def declare_artefact (self , value : Any ) -> ProvEntity :
592
+ def declare_artefact (self , value : Any , load_listing : str = "deep_listing" ) -> ProvEntity :
577
593
"""Create data artefact entities for all file objects."""
578
594
if value is None :
579
595
# FIXME: If this can happen in CWL, we'll
@@ -615,12 +631,12 @@ def declare_artefact(self, value: Any) -> ProvEntity:
615
631
616
632
# Base case - we found a File we need to update
617
633
if value .get ("class" ) == "File" :
618
- (entity , _ , _ ) = self .declare_file (value )
634
+ (entity , _ , _ ) = self .declare_file (value , load_listing )
619
635
value ["@id" ] = entity .identifier .uri
620
636
return entity
621
637
622
638
if value .get ("class" ) == "Directory" :
623
- entity = self .declare_directory (value )
639
+ entity = self .declare_directory (value , load_listing )
624
640
value ["@id" ] = entity .identifier .uri
625
641
return entity
626
642
coll_id = value .setdefault ("@id" , uuid .uuid4 ().urn )
@@ -643,7 +659,7 @@ def declare_artefact(self, value: Any) -> ProvEntity:
643
659
# Let's iterate and recurse
644
660
coll_attribs : List [Tuple [Union [str , Identifier ], Any ]] = []
645
661
for (key , val ) in value .items ():
646
- v_ent = self .declare_artefact (val )
662
+ v_ent = self .declare_artefact (val , load_listing )
647
663
self .document .membership (coll , v_ent )
648
664
m_entity = self .document .entity (uuid .uuid4 ().urn )
649
665
# Note: only support PROV-O style dictionary
@@ -664,7 +680,7 @@ def declare_artefact(self, value: Any) -> ProvEntity:
664
680
members = []
665
681
for each_input_obj in iter (value ):
666
682
# Recurse and register any nested objects
667
- e = self .declare_artefact (each_input_obj )
683
+ e = self .declare_artefact (each_input_obj , load_listing )
668
684
members .append (e )
669
685
670
686
# If we reached this, then we were allowed to iterate
@@ -698,20 +714,31 @@ def used_artefacts(
698
714
job_order : Union [CWLObjectType , List [CWLObjectType ]],
699
715
process_run_id : str ,
700
716
name : Optional [str ] = None ,
717
+ schema : Any = None ,
718
+ load_listing : Optional [str ] = None ,
701
719
) -> None :
702
720
"""Add used() for each data artefact."""
703
721
if isinstance (job_order , list ):
704
722
for entry in job_order :
705
- self .used_artefacts (entry , process_run_id , name )
723
+ # for field in schema.fields:
724
+ # if field['name'] == entry.
725
+ # load_listing = schema.fields
726
+ self .used_artefacts (entry , process_run_id , name , load_listing )
706
727
else :
707
728
# FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows
708
729
base = "main"
709
730
if name is not None :
710
731
base += "/" + name
711
732
for key , value in job_order .items ():
712
733
prov_role = self .wf_ns [f"{ base } /{ key } " ]
734
+ if not load_listing :
735
+ load_listing = "deep_listing"
736
+ for field in schema ['fields' ]:
737
+ if field ['name' ] == key :
738
+ load_listing = field ['loadListing' ]
739
+ break
713
740
try :
714
- entity = self .declare_artefact (value )
741
+ entity = self .declare_artefact (value , load_listing )
715
742
self .document .used (
716
743
process_run_id ,
717
744
entity ,
@@ -727,11 +754,12 @@ def generate_output_prov(
727
754
final_output : Union [CWLObjectType , MutableSequence [CWLObjectType ], None ],
728
755
process_run_id : Optional [str ],
729
756
name : Optional [str ],
757
+ load_listing : str = "deep_listing"
730
758
) -> None :
731
759
"""Call wasGeneratedBy() for each output,copy the files into the RO."""
732
760
if isinstance (final_output , MutableSequence ):
733
761
for entry in final_output :
734
- self .generate_output_prov (entry , process_run_id , name )
762
+ self .generate_output_prov (entry , process_run_id , name , load_listing )
735
763
elif final_output is not None :
736
764
# Timestamp should be created at the earliest
737
765
timestamp = datetime .datetime .now ()
@@ -740,7 +768,7 @@ def generate_output_prov(
740
768
# entity (UUID) and document it as generated in
741
769
# a role corresponding to the output
742
770
for output , value in final_output .items ():
743
- entity = self .declare_artefact (value )
771
+ entity = self .declare_artefact (value , load_listing )
744
772
if name is not None :
745
773
name = urllib .parse .quote (str (name ), safe = ":/,#" )
746
774
# FIXME: Probably not "main" in nested workflows
0 commit comments