3
3
import json
4
4
import logging
5
5
import sys
6
+ import asyncio
6
7
import tempfile
7
8
from contextlib import contextmanager
8
9
from copy import deepcopy
9
10
import operator
10
11
import itertools
11
12
from pathlib import Path
12
- from typing import Any , Dict , List , Set , Callable
13
+ from typing import Any , Dict , List , Set , Callable , Tuple
14
+ from collections import deque
13
15
14
16
import aiofiles
15
17
import aiohttp
34
36
from simcore_service_webserver .session import setup_session
35
37
from simcore_service_webserver .socketio import setup_socketio
36
38
from simcore_service_webserver .users import setup_users
39
+ from simcore_service_webserver .storage_handlers import get_file_download_url
40
+ from simcore_service_webserver .storage import setup_storage
41
+ from simcore_service_webserver .exporter .file_downloader import ParallelDownloader
42
+ from simcore_service_webserver .exporter .async_hashing import Algorithm , checksum
37
43
from yarl import URL
38
44
39
45
log = logging .getLogger (__name__ )
57
63
# store only lowercase "v1", "v2", etc...
58
64
SUPPORTED_EXPORTER_VERSIONS = {"v1" }
59
65
60
- KEYS_TO_IGNORE_FROM_COMPARISON = {"id" , "uuid" , "creation_date" , "last_change_date" }
66
+ REVERSE_REMAPPING_KEY = "__reverse__remapping__dict__key__"
67
+ KEYS_TO_IGNORE_FROM_COMPARISON = {
68
+ "id" ,
69
+ "uuid" ,
70
+ "creation_date" ,
71
+ "last_change_date" ,
72
+ REVERSE_REMAPPING_KEY ,
73
+ }
61
74
62
75
63
76
@pytest .fixture
@@ -140,6 +153,7 @@ def client(
140
153
setup_director (app )
141
154
setup_director_v2 (app )
142
155
setup_exporter (app )
156
+ setup_storage (app )
143
157
assert setup_resource_manager (app )
144
158
145
159
yield loop .run_until_complete (
@@ -233,7 +247,9 @@ async def query_project_from_db(
233
247
return dict (project )
234
248
235
249
236
- def replace_uuids_with_sequences (project : Dict [str , Any ]) -> Dict [str , Any ]:
250
+ def replace_uuids_with_sequences (original_project : Dict [str , Any ]) -> Dict [str , Any ]:
251
+ # first make a copy
252
+ project = deepcopy (original_project )
237
253
workbench = project ["workbench" ]
238
254
ui = project ["ui" ]
239
255
@@ -260,6 +276,10 @@ def replace_uuids_with_sequences(project: Dict[str, Any]) -> Dict[str, Any]:
260
276
261
277
project ["workbench" ] = json .loads (str_workbench )
262
278
project ["ui" ] = json .loads (str_ui )
279
+ # store for later usage
280
+ project [
281
+ REVERSE_REMAPPING_KEY
282
+ ] = remapping_dict # {v: k for k, v in remapping_dict.items()}
263
283
264
284
return project
265
285
@@ -273,12 +293,106 @@ def dict_without_keys(dict_data: Dict[str, Any], keys: Set[str]) -> Dict[str, An
273
293
274
294
def assert_combined_entires_condition (
275
295
* entries : Any , condition_operator : Callable
276
- ) -> bool :
296
+ ) -> None :
277
297
"""Ensures the condition_operator is True for all unique combinations"""
278
298
for combination in itertools .combinations (entries , 2 ):
279
299
assert condition_operator (combination [0 ], combination [1 ]) is True
280
300
281
301
302
+ def extract_original_files_for_node_sequence (
303
+ project : Dict [str , Any ], normalized_project : Dict [str , Any ]
304
+ ) -> Dict [str , Dict [str , str ]]:
305
+ """
306
+ Extracts path and store from ouput_1 field of each node and
307
+ returns mapped to the normalized data node keys for simpler comparison
308
+ """
309
+ results = {}
310
+ reverse_search_dict = normalized_project [REVERSE_REMAPPING_KEY ]
311
+
312
+ for uuid_key , node in project ["workbench" ].items ():
313
+ output_1 = node ["outputs" ]["output_1" ]
314
+ sequence_key = reverse_search_dict [uuid_key ]
315
+ results [sequence_key ] = {"store" : output_1 ["store" ], "path" : output_1 ["path" ]}
316
+
317
+ return results
318
+
319
+
320
+ async def extract_download_links_from_storage (
321
+ app : aiohttp .web .Application ,
322
+ original_files : Dict [str , Dict [str , str ]],
323
+ user_id : str ,
324
+ ) -> Dict [str , str ]:
325
+ async def _get_mapped_link (
326
+ seq_key : str , location_id : str , raw_file_path : str
327
+ ) -> Tuple [str , str ]:
328
+ link = await get_file_download_url (
329
+ app = app ,
330
+ location_id = location_id ,
331
+ fileId = raw_file_path ,
332
+ user_id = user_id ,
333
+ )
334
+ return seq_key , link
335
+
336
+ tasks = deque ()
337
+ for seq_key , data in original_files .items ():
338
+ tasks .append (
339
+ _get_mapped_link (
340
+ seq_key = seq_key ,
341
+ location_id = data ["store" ],
342
+ raw_file_path = data ["path" ],
343
+ )
344
+ )
345
+
346
+ results = await asyncio .gather (* tasks )
347
+
348
+ return {x [0 ]: x [1 ] for x in results }
349
+
350
+
351
+ async def download_files_and_get_checksums (
352
+ app : aiohttp .web .Application , download_links : Dict [str , str ]
353
+ ) -> Dict [str , str ]:
354
+ with tempfile .TemporaryDirectory () as store_dir :
355
+ download_paths = {}
356
+ parallel_downloader = ParallelDownloader ()
357
+ for seq_id , url in download_links .items ():
358
+ download_path = Path (store_dir ) / seq_id
359
+ await parallel_downloader .append_file (link = url , download_path = download_path )
360
+ download_paths [seq_id ] = download_path
361
+
362
+ await parallel_downloader .download_files (app = app )
363
+
364
+ # compute checksums for each downloaded file
365
+ checksums = {}
366
+ for seq_id , download_path in download_paths .items ():
367
+ checksums [seq_id ] = await checksum (
368
+ file_path = download_path , algorithm = Algorithm .SHA256
369
+ )
370
+
371
+ return checksums
372
+
373
+
374
+ async def get_checksmus_for_files_in_storage (
375
+ app : aiohttp .web .Application ,
376
+ project : Dict [str , Any ],
377
+ normalized_project : Dict [str , Any ],
378
+ user_id : str ,
379
+ ) -> Dict [str , str ]:
380
+ original_files = extract_original_files_for_node_sequence (
381
+ project = project , normalized_project = normalized_project
382
+ )
383
+
384
+ download_links = await extract_download_links_from_storage (
385
+ app = app , original_files = original_files , user_id = user_id
386
+ )
387
+
388
+ files_checksums = await download_files_and_get_checksums (
389
+ app = app ,
390
+ download_links = download_links ,
391
+ )
392
+
393
+ return files_checksums
394
+
395
+
282
396
################ end utils
283
397
284
398
@@ -314,7 +428,7 @@ async def test_import_export_import_duplicate(
314
428
produces the same result in the DB.
315
429
"""
316
430
317
- _ = await login_user (client )
431
+ user = await login_user (client )
318
432
export_file_name = export_version .name
319
433
version_from_name = export_file_name .split ("#" )[0 ]
320
434
@@ -381,6 +495,7 @@ async def test_import_export_import_duplicate(
381
495
condition_operator = operator .ne ,
382
496
)
383
497
498
+ # assert same structure in both directories
384
499
assert_combined_entires_condition (
385
500
dict_without_keys (normalized_imported_project , KEYS_TO_IGNORE_FROM_COMPARISON ),
386
501
dict_without_keys (
@@ -391,3 +506,30 @@ async def test_import_export_import_duplicate(
391
506
),
392
507
condition_operator = operator .eq ,
393
508
)
509
+
510
+ # check files in storage fingerprint matches
511
+ imported_files_checksums = await get_checksmus_for_files_in_storage (
512
+ app = client .app ,
513
+ project = imported_project ,
514
+ normalized_project = normalized_imported_project ,
515
+ user_id = user ["id" ],
516
+ )
517
+ reimported_files_checksums = await get_checksmus_for_files_in_storage (
518
+ app = client .app ,
519
+ project = reimported_project ,
520
+ normalized_project = normalized_reimported_project ,
521
+ user_id = user ["id" ],
522
+ )
523
+ duplicated_files_checksums = await get_checksmus_for_files_in_storage (
524
+ app = client .app ,
525
+ project = duplicated_project ,
526
+ normalized_project = normalized_duplicated_project ,
527
+ user_id = user ["id" ],
528
+ )
529
+
530
+ assert_combined_entires_condition (
531
+ imported_files_checksums ,
532
+ reimported_files_checksums ,
533
+ duplicated_files_checksums ,
534
+ condition_operator = operator .eq ,
535
+ )
0 commit comments