1
+ # pylint:disable=unused-variable
2
+ # pylint:disable=unused-argument
3
+ # pylint:disable=redefined-outer-name
4
+ # pylint:disable=too-many-instance-attributes
5
+
1
6
import contextlib
2
7
import json
3
8
import logging
8
13
from dataclasses import dataclass , field
9
14
from datetime import UTC , datetime , timedelta
10
15
from enum import Enum , unique
16
+ from pathlib import Path
11
17
from typing import Any , Final
12
18
19
+ import pytest
13
20
from playwright ._impl ._sync_base import EventContextManager
14
- from playwright .sync_api import APIRequestContext , FrameLocator , Locator , Page , Request
21
+ from playwright .sync_api import (
22
+ APIRequestContext ,
23
+ )
24
+ from playwright .sync_api import Error as PlaywrightError
25
+ from playwright .sync_api import (
26
+ FrameLocator ,
27
+ Locator ,
28
+ Page ,
29
+ Request ,
30
+ )
15
31
from playwright .sync_api import TimeoutError as PlaywrightTimeoutError
16
- from playwright .sync_api import WebSocket
32
+ from playwright .sync_api import (
33
+ WebSocket ,
34
+ )
17
35
from pydantic import AnyUrl
18
36
19
37
from .logging_tools import log_context
@@ -63,12 +81,15 @@ class NodeProgressType(str, Enum):
63
81
# NOTE: this is a partial duplicate of models_library/rabbitmq_messages.py
64
82
# It must remain as such until that module is pydantic V2 compatible
65
83
CLUSTER_UP_SCALING = "CLUSTER_UP_SCALING"
66
- SERVICE_INPUTS_PULLING = "SERVICE_INPUTS_PULLING"
67
84
SIDECARS_PULLING = "SIDECARS_PULLING"
85
+ SERVICE_INPUTS_PULLING = "SERVICE_INPUTS_PULLING"
68
86
SERVICE_OUTPUTS_PULLING = "SERVICE_OUTPUTS_PULLING"
69
87
SERVICE_STATE_PULLING = "SERVICE_STATE_PULLING"
70
88
SERVICE_IMAGES_PULLING = "SERVICE_IMAGES_PULLING"
71
89
SERVICE_CONTAINERS_STARTING = "SERVICE_CONTAINERS_STARTING"
90
+ SERVICE_STATE_PUSHING = "SERVICE_STATE_PUSHING"
91
+ SERVICE_OUTPUTS_PUSHING = "SERVICE_OUTPUTS_PUSHING"
92
+ PROJECT_CLOSING = "PROJECT_CLOSING"
72
93
73
94
@classmethod
74
95
def required_types_for_started_service (cls ) -> set ["NodeProgressType" ]:
@@ -94,6 +115,7 @@ class _OSparcMessages(str, Enum):
94
115
SERVICE_DISK_USAGE = "serviceDiskUsage"
95
116
WALLET_OSPARC_CREDITS_UPDATED = "walletOsparcCreditsUpdated"
96
117
LOGGER = "logger"
118
+ SERVICE_STATUS = "serviceStatus"
97
119
98
120
99
121
@dataclass (frozen = True , slots = True , kw_only = True )
@@ -107,6 +129,9 @@ class SocketIOEvent:
107
129
name : str
108
130
obj : dict [str , Any ]
109
131
132
+ def to_json (self ) -> str :
133
+ return json .dumps ({"name" : self .name , "obj" : self .obj })
134
+
110
135
111
136
SOCKETIO_MESSAGE_PREFIX : Final [str ] = "42"
112
137
@@ -286,23 +311,45 @@ def __call__(self, message: str) -> None:
286
311
print ("WS Message:" , decoded_message .name , decoded_message .obj )
287
312
288
313
314
+ _FAIL_FAST_DYNAMIC_SERVICE_STATES : Final [tuple [str , ...]] = ("idle" , "failed" )
315
+
316
+
289
317
@dataclass
290
318
class SocketIONodeProgressCompleteWaiter :
291
319
node_id : str
292
320
logger : logging .Logger
293
321
product_url : AnyUrl
294
322
api_request_context : APIRequestContext
295
323
is_service_legacy : bool
324
+ assertion_output_folder : Path
296
325
_current_progress : dict [NodeProgressType , float ] = field (
297
326
default_factory = defaultdict
298
327
)
299
328
_last_poll_timestamp : datetime = field (default_factory = lambda : datetime .now (tz = UTC ))
329
+ _received_messages : list [SocketIOEvent ] = field (default_factory = list )
330
+ _service_ready : bool = False
300
331
301
332
def __call__ (self , message : str ) -> bool :
302
333
# socket.io encodes messages like so
303
334
# https://stackoverflow.com/questions/24564877/what-do-these-numbers-mean-in-socket-io-payload
304
335
if message .startswith (SOCKETIO_MESSAGE_PREFIX ):
305
336
decoded_message = decode_socketio_42_message (message )
337
+ self ._received_messages .append (decoded_message )
338
+ if (
339
+ (decoded_message .name == _OSparcMessages .SERVICE_STATUS .value )
340
+ and (decoded_message .obj ["service_uuid" ] == self .node_id )
341
+ and (
342
+ decoded_message .obj ["service_state" ]
343
+ in _FAIL_FAST_DYNAMIC_SERVICE_STATES
344
+ )
345
+ ):
346
+ # NOTE: this is a fail fast for dynamic services that fail to start
347
+ self .logger .error (
348
+ "node %s failed with state %s, failing fast" ,
349
+ self .node_id ,
350
+ decoded_message .obj ["service_state" ],
351
+ )
352
+ return True
306
353
if decoded_message .name == _OSparcMessages .NODE_PROGRESS .value :
307
354
node_progress_event = retrieve_node_progress_from_decoded_message (
308
355
decoded_message
@@ -327,48 +374,56 @@ def __call__(self, message: str) -> bool:
327
374
len (NodeProgressType .required_types_for_started_service ()),
328
375
f"{ json .dumps ({k : round (v , 2 ) for k , v in self ._current_progress .items ()})} " ,
329
376
)
330
-
331
- return self .got_expected_node_progress_types () and all (
377
+ self ._service_ready = self .got_expected_node_progress_types () and all (
332
378
round (progress , 1 ) == 1.0
333
379
for progress in self ._current_progress .values ()
334
380
)
381
+ return self ._service_ready
335
382
336
383
_current_timestamp = datetime .now (UTC )
337
384
if _current_timestamp - self ._last_poll_timestamp > timedelta (seconds = 5 ):
385
+ # NOTE: we might have missed some websocket messages, and we check if the service is ready
338
386
if self .is_service_legacy :
339
387
url = f"https://{ self .get_partial_product_url ()} x/{ self .node_id } /"
340
388
else :
341
389
url = (
342
390
f"https://{ self .node_id } .services.{ self .get_partial_product_url ()} "
343
391
)
344
- with contextlib .suppress (PlaywrightTimeoutError ):
345
- response = self .api_request_context .get (url , timeout = 1000 )
346
- level = logging .DEBUG
347
- if (response .status >= 400 ) and (response .status not in (502 , 503 )):
348
- level = logging .ERROR
392
+ response = None
393
+ with contextlib .suppress (
394
+ PlaywrightTimeoutError , TimeoutError , PlaywrightError
395
+ ):
396
+ response = self .api_request_context .get (url , timeout = 5000 )
397
+ if response :
349
398
self .logger .log (
350
- level ,
399
+ (
400
+ logging .ERROR
401
+ if (response .status >= 400 )
402
+ and (response .status not in (502 , 503 ))
403
+ else logging .DEBUG
404
+ ),
351
405
"Querying service endpoint in case we missed some websocket messages. Url: %s Response: '%s' TIP: %s" ,
352
406
url ,
353
407
f"{ response .status } : { response .text ()} " ,
354
408
(
355
- "We are emulating the frontend; a 5XX response is acceptable if the service is not yet ready."
409
+ "We are emulating the frontend; a 502/503 response is acceptable if the service is not yet ready."
356
410
),
357
411
)
358
412
359
413
if response .status <= 400 :
360
- # NOTE: If the response status is less than 400, it means that the backend is ready (There are some services that respond with a 3XX)
414
+ # NOTE: If the response status is less than 400, it means that the service is ready (There are some services that respond with a 3XX)
361
415
if self .got_expected_node_progress_types ():
362
416
self .logger .warning (
363
417
"⚠️ Progress bar didn't receive 100 percent but service is already running: %s. TIP: we missed some websocket messages! ⚠️" , # https://github.com/ITISFoundation/osparc-simcore/issues/6449
364
418
self .get_current_progress (),
365
419
)
420
+ self ._service_ready = True
366
421
return True
367
- self ._last_poll_timestamp = datetime .now (UTC )
422
+ self ._last_poll_timestamp = datetime .now (UTC )
368
423
369
424
return False
370
425
371
- def got_expected_node_progress_types (self ):
426
+ def got_expected_node_progress_types (self ) -> bool :
372
427
return all (
373
428
progress_type in self ._current_progress
374
429
for progress_type in NodeProgressType .required_types_for_started_service ()
@@ -377,9 +432,35 @@ def got_expected_node_progress_types(self):
377
432
def get_current_progress (self ):
378
433
return self ._current_progress .values ()
379
434
380
- def get_partial_product_url (self ):
435
+ def get_partial_product_url (self ) -> str :
381
436
return f"{ self .product_url } " .split ("//" )[1 ]
382
437
438
+ @property
439
+ def number_received_messages (self ) -> int :
440
+ return len (self ._received_messages )
441
+
442
+ def assert_service_ready (self ) -> None :
443
+ if not self ._service_ready :
444
+ with self .assertion_output_folder .joinpath ("websocket.json" ).open ("w" ) as f :
445
+ f .writelines ("[" )
446
+ f .writelines (
447
+ f"{ msg .to_json ()} ," for msg in self ._received_messages [:- 1 ]
448
+ )
449
+ f .writelines (
450
+ f"{ self ._received_messages [- 1 ].to_json ()} "
451
+ ) # no comma for last element
452
+ f .writelines ("]" )
453
+ assert self ._service_ready , (
454
+ f"the service failed and received { self .number_received_messages } websocket messages while waiting!"
455
+ "\n TIP: check websocket.log for detailed information in the test-results folder!"
456
+ )
457
+
458
+
459
+ _FAIL_FAST_COMPUTATIONAL_STATES : Final [tuple [RunningState , ...]] = (
460
+ RunningState .FAILED ,
461
+ RunningState .ABORTED ,
462
+ )
463
+
383
464
384
465
def wait_for_pipeline_state (
385
466
current_state : RunningState ,
@@ -397,13 +478,22 @@ def wait_for_pipeline_state(
397
478
f"pipeline is now in { current_state = } " ,
398
479
),
399
480
):
400
- waiter = SocketIOProjectStateUpdatedWaiter (expected_states = expected_states )
481
+ waiter = SocketIOProjectStateUpdatedWaiter (
482
+ expected_states = expected_states + _FAIL_FAST_COMPUTATIONAL_STATES
483
+ )
401
484
with websocket .expect_event (
402
485
"framereceived" , waiter , timeout = timeout_ms
403
486
) as event :
404
487
current_state = retrieve_project_state_from_decoded_message (
405
488
decode_socketio_42_message (event .value )
406
489
)
490
+ if (
491
+ current_state in _FAIL_FAST_COMPUTATIONAL_STATES
492
+ and current_state not in expected_states
493
+ ):
494
+ pytest .fail (
495
+ f"Pipeline failed with state { current_state } . Expected one of { expected_states } "
496
+ )
407
497
return current_state
408
498
409
499
@@ -437,6 +527,7 @@ def expected_service_running(
437
527
press_start_button : bool ,
438
528
product_url : AnyUrl ,
439
529
is_service_legacy : bool ,
530
+ assertion_output_folder : Path ,
440
531
) -> Generator [ServiceRunning , None , None ]:
441
532
with log_context (
442
533
logging .INFO , msg = f"Waiting for node to run. Timeout: { timeout } "
@@ -447,25 +538,16 @@ def expected_service_running(
447
538
product_url = product_url ,
448
539
api_request_context = page .request ,
449
540
is_service_legacy = is_service_legacy ,
541
+ assertion_output_folder = assertion_output_folder ,
450
542
)
451
543
service_running = ServiceRunning (iframe_locator = None )
452
544
453
- try :
454
- with websocket .expect_event ("framereceived" , waiter , timeout = timeout ):
455
- if press_start_button :
456
- _trigger_service_start (page , node_id )
457
-
458
- yield service_running
459
-
460
- except PlaywrightTimeoutError :
461
- if waiter .got_expected_node_progress_types ():
462
- ctx .logger .warning (
463
- "⚠️ Progress bar didn't receive 100 percent but all expected node-progress-types are in place: %s ⚠️" , # https://github.com/ITISFoundation/osparc-simcore/issues/6449
464
- waiter .get_current_progress (),
465
- )
466
- else :
467
- raise
545
+ with websocket .expect_event ("framereceived" , waiter , timeout = timeout ):
546
+ if press_start_button :
547
+ _trigger_service_start (page , node_id )
468
548
549
+ yield service_running
550
+ waiter .assert_service_ready ()
469
551
service_running .iframe_locator = page .frame_locator (
470
552
f'[osparc-test-id="iframe_{ node_id } "]'
471
553
)
@@ -480,6 +562,7 @@ def wait_for_service_running(
480
562
press_start_button : bool ,
481
563
product_url : AnyUrl ,
482
564
is_service_legacy : bool ,
565
+ assertion_output_folder : Path ,
483
566
) -> FrameLocator :
484
567
"""NOTE: if the service was already started this will not work as some of the required websocket events will not be emitted again
485
568
In which case this will need further adjutment"""
@@ -493,10 +576,13 @@ def wait_for_service_running(
493
576
product_url = product_url ,
494
577
api_request_context = page .request ,
495
578
is_service_legacy = is_service_legacy ,
579
+ assertion_output_folder = assertion_output_folder ,
496
580
)
497
581
with websocket .expect_event ("framereceived" , waiter , timeout = timeout ):
498
582
if press_start_button :
499
583
_trigger_service_start (page , node_id )
584
+
585
+ waiter .assert_service_ready ()
500
586
return page .frame_locator (f'[osparc-test-id="iframe_{ node_id } "]' )
501
587
502
588
0 commit comments