Skip to content

Commit 80e867d

Browse files
committed
Fix test_parsl.py
* Remove test of DataFlow * Use tmp_path fixture * Try to reduce parsl verbosity
1 parent ee39742 commit 80e867d

File tree

1 file changed

+20
-126
lines changed

1 file changed

+20
-126
lines changed

tests/test_parsl.py

Lines changed: 20 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,25 @@
1111
Institute for Biomedical Research and Pelkmans Lab from the University of
1212
Zurich.
1313
"""
14-
import itertools
1514
import os
1615
import pathlib
1716
import subprocess
1817

1918
import parsl
2019
import pytest
20+
from devtools import debug
2121
from parsl.addresses import address_by_hostname
22-
from parsl.app.app import bash_app
2322
from parsl.app.app import python_app
2423
from parsl.channels import LocalChannel
2524
from parsl.config import Config
26-
from parsl.data_provider.files import File
2725
from parsl.executors import HighThroughputExecutor
2826
from parsl.launchers import SingleNodeLauncher
2927
from parsl.launchers import SrunLauncher
3028
from parsl.providers import LocalProvider
3129
from parsl.providers import SlurmProvider
3230

31+
PARSL_DEBUG = False
32+
3333
try:
3434
process = subprocess.Popen(
3535
["sinfo"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
@@ -58,7 +58,7 @@ def initialize_SlurmProvider():
5858
mem_per_node=1, # specified in GB
5959
parallelism=0,
6060
partition="main",
61-
launcher=SrunLauncher(debug=True),
61+
launcher=SrunLauncher(debug=PARSL_DEBUG),
6262
)
6363
return slurm
6464

@@ -70,7 +70,7 @@ def initialize_LocalProvider():
7070
min_blocks=1,
7171
max_blocks=1,
7272
parallelism=0,
73-
launcher=SingleNodeLauncher(debug=True),
73+
launcher=SingleNodeLauncher(debug=PARSL_DEBUG),
7474
)
7575
return local
7676

@@ -80,7 +80,7 @@ def initialize_HighThroughputExecutor(provider=None):
8080
htex = HighThroughputExecutor(
8181
label="htex",
8282
address=address_by_hostname(),
83-
worker_debug=True,
83+
worker_debug=PARSL_DEBUG,
8484
provider=provider,
8585
)
8686
return htex
@@ -109,7 +109,7 @@ def increment_by_one(num):
109109
assert incremented_numbers == [i + 1 for i in range(N)]
110110

111111

112-
def test_workflow_generate_combine_split(tmp_path: pathlib.Path):
112+
def test_import_numpy(tmp_path: pathlib.Path):
113113
if HAS_SLURM:
114114
provider = initialize_SlurmProvider()
115115
else:
@@ -119,101 +119,7 @@ def test_workflow_generate_combine_split(tmp_path: pathlib.Path):
119119
parsl.clear()
120120
parsl.load(config)
121121

122-
# Generate a file with three numbers (well, channel, randint(0, 100))
123-
@python_app
124-
def task_generate(inputs=[], outputs=[]):
125-
import random
126-
127-
with open(outputs[0], "w") as out:
128-
random_number = random.randint(0, 100)
129-
out.write(f"{inputs[0]} {inputs[1]} {random_number}\n")
130-
131-
# Combine input files into a single output file
132-
@bash_app
133-
def task_combine(inputs=[], outputs=[]):
134-
return "cat {0} > {1}".format(
135-
" ".join([i.filepath for i in inputs]), outputs[0]
136-
)
137-
138-
# Scan input file, find relevant line, write it on output file
139-
@python_app
140-
def task_split(inputs=[], outputs=[]):
141-
with open(inputs[0], "r") as f1:
142-
for line in f1:
143-
w, c, num = [int(i) for i in line.split()]
144-
if w == inputs[1] and c == inputs[2]:
145-
with open(outputs[0], "w") as f2:
146-
f2.write(f"{w} {c} {num}\n")
147-
148-
# Create files
149-
tmp_dir = tmp_path.resolve().as_posix()
150-
n_wells = 2
151-
n_channels = 2
152-
output_files = []
153-
for well, channel in itertools.product(range(n_wells), range(n_channels)):
154-
output_files.append(
155-
task_generate(
156-
inputs=[well, channel],
157-
outputs=[
158-
File(
159-
os.path.join(
160-
tmp_dir,
161-
f"file-w{well}-c{channel}.txt",
162-
)
163-
)
164-
],
165-
)
166-
)
167-
168-
# Concatenate the files into a single file
169-
cc = task_combine(
170-
inputs=[i.outputs[0] for i in output_files],
171-
outputs=[File(os.path.join(tmp_dir, "combined_data.txt"))],
172-
)
173-
174-
# Split the single file back into many files
175-
inputs = []
176-
outputs = []
177-
for well, channel in itertools.product(range(n_wells), range(n_channels)):
178-
inputs.append([cc.outputs[0], well, channel])
179-
outputs.append(
180-
[
181-
File(
182-
os.path.join(
183-
tmp_dir,
184-
f"processed_file-w{well}-c{channel}.txt",
185-
)
186-
)
187-
]
188-
)
189-
190-
split = [
191-
task_split(inputs=inputs[i], outputs=outputs[i])
192-
for i in range(len(inputs))
193-
]
194-
[x.result() for x in split]
195-
196-
# Verify that all files are generated
197-
for well, channel in itertools.product(range(n_wells), range(n_channels)):
198-
assert os.path.isfile(
199-
os.path.join(tmp_dir, f"file-w{well}-c{channel}.txt")
200-
)
201-
assert os.path.isfile(os.path.join(tmp_dir, "combined_data.txt"))
202-
for well, channel in itertools.product(range(n_wells), range(n_channels)):
203-
assert os.path.isfile(
204-
os.path.join(tmp_dir, f"processed_file-w{well}-c{channel}.txt")
205-
)
206-
207-
208-
def test_import_numpy():
209-
if HAS_SLURM:
210-
provider = initialize_SlurmProvider()
211-
else:
212-
provider = initialize_LocalProvider()
213-
htex = initialize_HighThroughputExecutor(provider=provider)
214-
config = Config(executors=[htex])
215-
parsl.clear()
216-
parsl.load(config)
122+
debug(tmp_path)
217123

218124
@python_app
219125
def importnumpy():
@@ -237,9 +143,7 @@ def importnumpy():
237143
app = importnumpy()
238144
info = app.result()
239145

240-
if not os.path.isdir("tmp_data"):
241-
os.makedirs("tmp_data")
242-
with open(os.path.join(os.getcwd(), "tmp_data/info.txt"), "w") as out:
146+
with open(tmp_path / "info.txt", "w") as out:
243147
out.write(info)
244148

245149

@@ -290,10 +194,10 @@ def test_use_tensorflow_on_gpu():
290194
slurm = SlurmProvider(
291195
partition="gpu",
292196
channel=LocalChannel(),
293-
launcher=SrunLauncher(debug=True),
197+
launcher=SrunLauncher(debug=PARSL_DEBUG),
294198
)
295199
htex = HighThroughputExecutor(
296-
address=address_by_hostname(), worker_debug=True, provider=slurm
200+
address=address_by_hostname(), worker_debug=PARSL_DEBUG, provider=slurm
297201
)
298202
config = Config(executors=[htex])
299203
parsl.clear()
@@ -322,23 +226,23 @@ def test_multiexecutor_workflow():
322226
slurm_cpu = SlurmProvider(
323227
partition="main",
324228
channel=LocalChannel(),
325-
launcher=SrunLauncher(debug=True),
229+
launcher=SrunLauncher(debug=PARSL_DEBUG),
326230
)
327231
slurm_gpu = SlurmProvider(
328232
partition="gpu",
329233
channel=LocalChannel(),
330-
launcher=SrunLauncher(debug=True),
234+
launcher=SrunLauncher(debug=PARSL_DEBUG),
331235
)
332236
htex_cpu = HighThroughputExecutor(
333237
address=address_by_hostname(),
334238
label="htex_cpu",
335-
worker_debug=True,
239+
worker_debug=PARSL_DEBUG,
336240
provider=slurm_cpu,
337241
)
338242
htex_gpu = HighThroughputExecutor(
339243
address=address_by_hostname(),
340244
label="htex_gpu",
341-
worker_debug=True,
245+
worker_debug=PARSL_DEBUG,
342246
provider=slurm_gpu,
343247
)
344248
config = Config(executors=[htex_cpu, htex_gpu])
@@ -374,31 +278,31 @@ def test_multiexecutor_workflow_flexible():
374278
slurm_cpu = SlurmProvider(
375279
partition="main",
376280
channel=LocalChannel(),
377-
launcher=SrunLauncher(debug=True),
281+
launcher=SrunLauncher(debug=PARSL_DEBUG),
378282
)
379283
slurm_gpu = SlurmProvider(
380284
partition="gpu",
381285
channel=LocalChannel(),
382-
launcher=SrunLauncher(debug=True),
286+
launcher=SrunLauncher(debug=PARSL_DEBUG),
383287
)
384288

385289
htex_cpu = HighThroughputExecutor(
386290
address=address_by_hostname(),
387291
label="htex_cpu",
388-
worker_debug=True,
292+
worker_debug=PARSL_DEBUG,
389293
provider=slurm_cpu,
390294
)
391295

392296
htex_cpu = HighThroughputExecutor(
393297
address=address_by_hostname(),
394298
label="htex_cpu",
395-
worker_debug=True,
299+
worker_debug=PARSL_DEBUG,
396300
provider=slurm_cpu,
397301
)
398302
htex_gpu = HighThroughputExecutor(
399303
address=address_by_hostname(),
400304
label="htex_gpu",
401-
worker_debug=True,
305+
worker_debug=PARSL_DEBUG,
402306
provider=slurm_gpu,
403307
)
404308
config = Config(
@@ -482,13 +386,3 @@ def increment_by_three_gpu(num):
482386

483387
print(f"seven: {seven}")
484388
assert seven == 7
485-
486-
487-
if __name__ == "__main__":
488-
test_single_app()
489-
test_workflow_generate_combine_split()
490-
test_workflow_compute_pi()
491-
test_import_numpy()
492-
test_use_tensorflow_on_gpu()
493-
test_multiexecutor_workflow()
494-
test_multiexecutor_workflow_flexible()

0 commit comments

Comments
 (0)