7
7
from concurrent .futures import ThreadPoolExecutor
8
8
from pathlib import Path
9
9
from typing import Dict , List , Optional , Tuple
10
+ from collections import deque
10
11
11
12
import aiobotocore
12
13
import aiofiles
22
23
from s3wrapper .s3_client import S3Client
23
24
from servicelib .aiopg_utils import DBAPIError , PostgresRetryPolicyUponOperation
24
25
from servicelib .client_session import get_client_session
26
+ from servicelib .utils import fire_and_forget_task
25
27
28
+ from .utils import expo
26
29
from .datcore_wrapper import DatcoreWrapper
27
30
from .models import (
28
31
DatasetMetaData ,
@@ -198,7 +201,7 @@ async def list_files(
198
201
199
202
Can filter upon regular expression (for now only on key: value pairs of the FileMetaData)
200
203
"""
201
- data = []
204
+ data = deque ()
202
205
if location == SIMCORE_S3_STR :
203
206
async with self .engine .acquire () as conn :
204
207
query = sa .select ([file_meta_data ]).where (
@@ -234,87 +237,29 @@ async def list_files(
234
237
# there seems to be no project whatsoever for user_id
235
238
return []
236
239
237
- # only keep files from non-deleted project --> This needs to be fixed
238
- clean_data = []
240
+ # only keep files from non-deleted project
241
+ clean_data = deque ()
239
242
for dx in data :
240
243
d = dx .fmd
241
- if d .project_id in uuid_name_dict :
242
- d .project_name = uuid_name_dict [d .project_id ]
243
- if d .node_id in uuid_name_dict :
244
- d .node_name = uuid_name_dict [d .node_id ]
245
-
246
- d .raw_file_path = str (
247
- Path (d .project_id ) / Path (d .node_id ) / Path (d .file_name )
248
- )
249
- d .display_file_path = d .raw_file_path
250
- d .file_id = d .file_uuid
251
- if d .node_name and d .project_name :
252
- d .display_file_path = str (
253
- Path (d .project_name )
254
- / Path (d .node_name )
255
- / Path (d .file_name )
256
- )
257
- async with self .engine .acquire () as conn :
258
- query = (
259
- file_meta_data .update ()
260
- .where (
261
- and_ (
262
- file_meta_data .c .node_id == d .node_id ,
263
- file_meta_data .c .user_id == d .user_id ,
264
- )
265
- )
266
- .values (
267
- project_name = d .project_name ,
268
- node_name = d .node_name ,
269
- raw_file_path = d .raw_file_path ,
270
- file_id = d .file_id ,
271
- display_file_path = d .display_file_path ,
272
- )
273
- )
274
- await conn .execute (query )
275
- clean_data .append (dx )
244
+ if d .project_id not in uuid_name_dict :
245
+ continue
276
246
277
- data = clean_data
247
+ d .project_name = uuid_name_dict [d .project_id ]
248
+ if d .node_id in uuid_name_dict :
249
+ d .node_name = uuid_name_dict [d .node_id ]
278
250
279
- # same as above, make sure file is physically present on s3
280
- clean_data = []
281
- # FIXME: MaG: This is inefficient: Do this automatically when file is modified
282
- session = aiobotocore .get_session ()
283
- async with session .create_client (
284
- "s3" ,
285
- endpoint_url = self .s3_client .endpoint_url ,
286
- aws_access_key_id = self .s3_client .access_key ,
287
- aws_secret_access_key = self .s3_client .secret_key ,
288
- ) as client :
289
- responses = await asyncio .gather (
290
- * [
291
- client .list_objects_v2 (
292
- Bucket = _d .bucket_name , Prefix = _d .object_name
293
- )
294
- for _d in [__d .fmd for __d in data ]
295
- ]
251
+ d .raw_file_path = str (
252
+ Path (d .project_id ) / Path (d .node_id ) / Path (d .file_name )
296
253
)
297
- for dx , resp in zip (data , responses ):
298
- if "Contents" in resp :
299
- clean_data .append (dx )
300
- d = dx .fmd
301
- d .file_size = resp ["Contents" ][0 ]["Size" ]
302
- d .last_modified = str (resp ["Contents" ][0 ]["LastModified" ])
303
- async with self .engine .acquire () as conn :
304
- query = (
305
- file_meta_data .update ()
306
- .where (
307
- and_ (
308
- file_meta_data .c .node_id == d .node_id ,
309
- file_meta_data .c .user_id == d .user_id ,
310
- )
311
- )
312
- .values (
313
- file_size = d .file_size ,
314
- last_modified = d .last_modified ,
315
- )
316
- )
317
- await conn .execute (query )
254
+ d .display_file_path = d .raw_file_path
255
+ d .file_id = d .file_uuid
256
+ if d .node_name and d .project_name :
257
+ d .display_file_path = str (
258
+ Path (d .project_name ) / Path (d .node_name ) / Path (d .file_name )
259
+ )
260
+ # once the data was sync to postgres metadata table at this point
261
+ clean_data .append (dx )
262
+
318
263
data = clean_data
319
264
320
265
elif location == DATCORE_STR :
@@ -324,7 +269,7 @@ async def list_files(
324
269
325
270
if uuid_filter :
326
271
_query = re .compile (uuid_filter , re .IGNORECASE )
327
- filtered_data = []
272
+ filtered_data = deque ()
328
273
for dx in data :
329
274
d = dx .fmd
330
275
if _query .search (d .file_uuid ):
@@ -334,7 +279,7 @@ async def list_files(
334
279
335
280
if regex :
336
281
_query = re .compile (regex , re .IGNORECASE )
337
- filtered_data = []
282
+ filtered_data = deque ()
338
283
for dx in data :
339
284
d = dx .fmd
340
285
_vars = vars (d )
@@ -344,7 +289,7 @@ async def list_files(
344
289
break
345
290
return filtered_data
346
291
347
- return data
292
+ return list ( data )
348
293
349
294
async def list_files_dataset (
350
295
self , user_id : str , location : str , dataset_id : str
@@ -468,9 +413,80 @@ async def upload_file_to_datcore(
468
413
469
414
# actually we have to query the master db
470
415
416
+ async def metadata_file_updater (
417
+ self ,
418
+ file_uuid : str ,
419
+ bucket_name : str ,
420
+ object_name : str ,
421
+ file_size : int ,
422
+ last_modified : str ,
423
+ max_update_retries : int = 50 ,
424
+ ):
425
+ """
426
+ Will retry max_update_retries to update the metadata on the file after an upload.
427
+ If it is not successfull it will exit and log an error.
428
+
429
+ Note: MinIO bucket notifications are not available with S3, that's why we have the
430
+ following hacky solution
431
+ """
432
+ current_iteraction = 0
433
+
434
+ session = aiobotocore .get_session ()
435
+ async with session .create_client (
436
+ "s3" ,
437
+ endpoint_url = self .s3_client .endpoint_url ,
438
+ aws_access_key_id = self .s3_client .access_key ,
439
+ aws_secret_access_key = self .s3_client .secret_key ,
440
+ ) as client :
441
+ current_iteraction += 1
442
+ continue_loop = True
443
+ sleep_generator = expo ()
444
+ update_succeeded = False
445
+
446
+ while continue_loop :
447
+ result = await client .list_objects_v2 (
448
+ Bucket = bucket_name , Prefix = object_name
449
+ )
450
+ sleep_amount = next (sleep_generator )
451
+ continue_loop = current_iteraction <= max_update_retries
452
+
453
+ if "Contents" not in result :
454
+ logger .info ("File '%s' was not found in the bucket" , object_name )
455
+ await asyncio .sleep (sleep_amount )
456
+ continue
457
+
458
+ new_file_size = result ["Contents" ][0 ]["Size" ]
459
+ new_last_modified = str (result ["Contents" ][0 ]["LastModified" ])
460
+ if file_size == new_file_size or last_modified == new_last_modified :
461
+ logger .info ("File '%s' did not change yet" , object_name )
462
+ await asyncio .sleep (sleep_amount )
463
+ continue
464
+
465
+ # finally update the data in the database and exit
466
+ continue_loop = False
467
+
468
+ logger .info (
469
+ "Obtained this from S3: new_file_size=%s new_last_modified=%s" ,
470
+ new_file_size ,
471
+ new_last_modified ,
472
+ )
473
+
474
+ async with self .engine .acquire () as conn :
475
+ query = (
476
+ file_meta_data .update ()
477
+ .where (file_meta_data .c .file_uuid == file_uuid )
478
+ .values (
479
+ file_size = new_file_size , last_modified = new_last_modified
480
+ )
481
+ ) # primary key search is faster
482
+ await conn .execute (query )
483
+ update_succeeded = True
484
+ if not update_succeeded :
485
+ logger .error ("Could not update file metadata for '%s'" , file_uuid )
486
+
471
487
async def upload_link (self , user_id : str , file_uuid : str ):
472
488
@retry (** postgres_service_retry_policy_kwargs )
473
- async def _execute_query ():
489
+ async def _execute_query () -> Tuple [ int , str ] :
474
490
async with self .engine .acquire () as conn :
475
491
fmd = FileMetaData ()
476
492
fmd .simcore_from_uuid (file_uuid , self .simcore_bucket_name )
@@ -484,11 +500,24 @@ async def _execute_query():
484
500
if exists is None :
485
501
ins = file_meta_data .insert ().values (** vars (fmd ))
486
502
await conn .execute (ins )
503
+ return fmd .file_size , fmd .last_modified
487
504
488
- await _execute_query ()
505
+ file_size , last_modified = await _execute_query ()
489
506
490
507
bucket_name = self .simcore_bucket_name
491
508
object_name = file_uuid
509
+
510
+ # a parallel task is tarted which will update the metadata of the updated file
511
+ # once the update has finished.
512
+ fire_and_forget_task (
513
+ self .metadata_file_updater (
514
+ file_uuid = file_uuid ,
515
+ bucket_name = bucket_name ,
516
+ object_name = object_name ,
517
+ file_size = file_size ,
518
+ last_modified = last_modified ,
519
+ )
520
+ )
492
521
return self .s3_client .create_presigned_put_url (bucket_name , object_name )
493
522
494
523
async def copy_file_s3_s3 (self , user_id : str , dest_uuid : str , source_uuid : str ):
@@ -742,7 +771,7 @@ async def deep_copy_project_simcore_s3(
742
771
await conn .execute (ins )
743
772
744
773
async def delete_project_simcore_s3 (
745
- self , user_id : str , project_id : str , node_id : Optional [str ]= None
774
+ self , user_id : str , project_id : str , node_id : Optional [str ] = None
746
775
) -> web .Response :
747
776
""" Deletes all files from a given node in a project in simcore.s3 and updated db accordingly.
748
777
If node_id is not given, then all the project files db entries are deleted.
0 commit comments