Skip to content

🎨E2E: Fail fast conditions + websocket logging in case of error #7463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 118 additions & 32 deletions packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# pylint:disable=unused-variable
# pylint:disable=unused-argument
# pylint:disable=redefined-outer-name
# pylint:disable=too-many-instance-attributes

import contextlib
import json
import logging
Expand All @@ -8,12 +13,25 @@
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import Enum, unique
from pathlib import Path
from typing import Any, Final

import pytest
from playwright._impl._sync_base import EventContextManager
from playwright.sync_api import APIRequestContext, FrameLocator, Locator, Page, Request
from playwright.sync_api import (
APIRequestContext,
)
from playwright.sync_api import Error as PlaywrightError
from playwright.sync_api import (
FrameLocator,
Locator,
Page,
Request,
)
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import WebSocket
from playwright.sync_api import (
WebSocket,
)
from pydantic import AnyUrl

from .logging_tools import log_context
Expand Down Expand Up @@ -63,12 +81,15 @@ class NodeProgressType(str, Enum):
# NOTE: this is a partial duplicate of models_library/rabbitmq_messages.py
# It must remain as such until that module is pydantic V2 compatible
CLUSTER_UP_SCALING = "CLUSTER_UP_SCALING"
SERVICE_INPUTS_PULLING = "SERVICE_INPUTS_PULLING"
SIDECARS_PULLING = "SIDECARS_PULLING"
SERVICE_INPUTS_PULLING = "SERVICE_INPUTS_PULLING"
SERVICE_OUTPUTS_PULLING = "SERVICE_OUTPUTS_PULLING"
SERVICE_STATE_PULLING = "SERVICE_STATE_PULLING"
SERVICE_IMAGES_PULLING = "SERVICE_IMAGES_PULLING"
SERVICE_CONTAINERS_STARTING = "SERVICE_CONTAINERS_STARTING"
SERVICE_STATE_PUSHING = "SERVICE_STATE_PUSHING"
SERVICE_OUTPUTS_PUSHING = "SERVICE_OUTPUTS_PUSHING"
PROJECT_CLOSING = "PROJECT_CLOSING"

@classmethod
def required_types_for_started_service(cls) -> set["NodeProgressType"]:
Expand All @@ -94,6 +115,7 @@ class _OSparcMessages(str, Enum):
SERVICE_DISK_USAGE = "serviceDiskUsage"
WALLET_OSPARC_CREDITS_UPDATED = "walletOsparcCreditsUpdated"
LOGGER = "logger"
SERVICE_STATUS = "serviceStatus"


@dataclass(frozen=True, slots=True, kw_only=True)
Expand All @@ -107,6 +129,9 @@ class SocketIOEvent:
name: str
obj: dict[str, Any]

def to_json(self) -> str:
return json.dumps({"name": self.name, "obj": self.obj})


SOCKETIO_MESSAGE_PREFIX: Final[str] = "42"

Expand Down Expand Up @@ -286,23 +311,45 @@ def __call__(self, message: str) -> None:
print("WS Message:", decoded_message.name, decoded_message.obj)


_FAIL_FAST_DYNAMIC_SERVICE_STATES: Final[tuple[str, ...]] = ("idle", "failed")


@dataclass
class SocketIONodeProgressCompleteWaiter:
node_id: str
logger: logging.Logger
product_url: AnyUrl
api_request_context: APIRequestContext
is_service_legacy: bool
assertion_output_folder: Path
_current_progress: dict[NodeProgressType, float] = field(
default_factory=defaultdict
)
_last_poll_timestamp: datetime = field(default_factory=lambda: datetime.now(tz=UTC))
_received_messages: list[SocketIOEvent] = field(default_factory=list)
_service_ready: bool = False

def __call__(self, message: str) -> bool:
# socket.io encodes messages like so
# https://stackoverflow.com/questions/24564877/what-do-these-numbers-mean-in-socket-io-payload
if message.startswith(SOCKETIO_MESSAGE_PREFIX):
decoded_message = decode_socketio_42_message(message)
self._received_messages.append(decoded_message)
if (
(decoded_message.name == _OSparcMessages.SERVICE_STATUS.value)
and (decoded_message.obj["service_uuid"] == self.node_id)
and (
decoded_message.obj["service_state"]
in _FAIL_FAST_DYNAMIC_SERVICE_STATES
)
):
# NOTE: this is a fail fast for dynamic services that fail to start
self.logger.error(
"node %s failed with state %s, failing fast",
self.node_id,
decoded_message.obj["service_state"],
)
return True
if decoded_message.name == _OSparcMessages.NODE_PROGRESS.value:
node_progress_event = retrieve_node_progress_from_decoded_message(
decoded_message
Expand All @@ -327,48 +374,56 @@ def __call__(self, message: str) -> bool:
len(NodeProgressType.required_types_for_started_service()),
f"{json.dumps({k: round(v, 2) for k, v in self._current_progress.items()})}",
)

return self.got_expected_node_progress_types() and all(
self._service_ready = self.got_expected_node_progress_types() and all(
round(progress, 1) == 1.0
for progress in self._current_progress.values()
)
return self._service_ready

_current_timestamp = datetime.now(UTC)
if _current_timestamp - self._last_poll_timestamp > timedelta(seconds=5):
# NOTE: we might have missed some websocket messages, and we check if the service is ready
if self.is_service_legacy:
url = f"https://{self.get_partial_product_url()}x/{self.node_id}/"
else:
url = (
f"https://{self.node_id}.services.{self.get_partial_product_url()}"
)
with contextlib.suppress(PlaywrightTimeoutError):
response = self.api_request_context.get(url, timeout=1000)
level = logging.DEBUG
if (response.status >= 400) and (response.status not in (502, 503)):
level = logging.ERROR
response = None
with contextlib.suppress(
PlaywrightTimeoutError, TimeoutError, PlaywrightError
):
response = self.api_request_context.get(url, timeout=5000)
if response:
self.logger.log(
level,
(
logging.ERROR
if (response.status >= 400)
and (response.status not in (502, 503))
else logging.DEBUG
),
"Querying service endpoint in case we missed some websocket messages. Url: %s Response: '%s' TIP: %s",
url,
f"{response.status}: {response.text()}",
(
"We are emulating the frontend; a 5XX response is acceptable if the service is not yet ready."
"We are emulating the frontend; a 502/503 response is acceptable if the service is not yet ready."
),
)

if response.status <= 400:
# NOTE: If the response status is less than 400, it means that the backend is ready (There are some services that respond with a 3XX)
# NOTE: If the response status is less than 400, it means that the service is ready (There are some services that respond with a 3XX)
if self.got_expected_node_progress_types():
self.logger.warning(
"⚠️ Progress bar didn't receive 100 percent but service is already running: %s. TIP: we missed some websocket messages! ⚠️", # https://github.com/ITISFoundation/osparc-simcore/issues/6449
self.get_current_progress(),
)
self._service_ready = True
return True
self._last_poll_timestamp = datetime.now(UTC)
self._last_poll_timestamp = datetime.now(UTC)

return False

def got_expected_node_progress_types(self):
def got_expected_node_progress_types(self) -> bool:
return all(
progress_type in self._current_progress
for progress_type in NodeProgressType.required_types_for_started_service()
Expand All @@ -377,9 +432,35 @@ def got_expected_node_progress_types(self):
def get_current_progress(self):
return self._current_progress.values()

def get_partial_product_url(self):
def get_partial_product_url(self) -> str:
return f"{self.product_url}".split("//")[1]

@property
def number_received_messages(self) -> int:
return len(self._received_messages)

def assert_service_ready(self) -> None:
if not self._service_ready:
with self.assertion_output_folder.joinpath("websocket.json").open("w") as f:
f.writelines("[")
f.writelines(
f"{msg.to_json()}," for msg in self._received_messages[:-1]
)
f.writelines(
f"{self._received_messages[-1].to_json()}"
) # no comma for last element
f.writelines("]")
assert self._service_ready, (
f"the service failed and received {self.number_received_messages} websocket messages while waiting!"
"\nTIP: check websocket.log for detailed information in the test-results folder!"
)


_FAIL_FAST_COMPUTATIONAL_STATES: Final[tuple[RunningState, ...]] = (
RunningState.FAILED,
RunningState.ABORTED,
)


def wait_for_pipeline_state(
current_state: RunningState,
Expand All @@ -397,13 +478,22 @@ def wait_for_pipeline_state(
f"pipeline is now in {current_state=}",
),
):
waiter = SocketIOProjectStateUpdatedWaiter(expected_states=expected_states)
waiter = SocketIOProjectStateUpdatedWaiter(
expected_states=expected_states + _FAIL_FAST_COMPUTATIONAL_STATES
)
with websocket.expect_event(
"framereceived", waiter, timeout=timeout_ms
) as event:
current_state = retrieve_project_state_from_decoded_message(
decode_socketio_42_message(event.value)
)
if (
current_state in _FAIL_FAST_COMPUTATIONAL_STATES
and current_state not in expected_states
):
pytest.fail(
f"Pipeline failed with state {current_state}. Expected one of {expected_states}"
)
return current_state


Expand Down Expand Up @@ -437,6 +527,7 @@ def expected_service_running(
press_start_button: bool,
product_url: AnyUrl,
is_service_legacy: bool,
assertion_output_folder: Path,
) -> Generator[ServiceRunning, None, None]:
with log_context(
logging.INFO, msg=f"Waiting for node to run. Timeout: {timeout}"
Expand All @@ -447,25 +538,16 @@ def expected_service_running(
product_url=product_url,
api_request_context=page.request,
is_service_legacy=is_service_legacy,
assertion_output_folder=assertion_output_folder,
)
service_running = ServiceRunning(iframe_locator=None)

try:
with websocket.expect_event("framereceived", waiter, timeout=timeout):
if press_start_button:
_trigger_service_start(page, node_id)

yield service_running

except PlaywrightTimeoutError:
if waiter.got_expected_node_progress_types():
ctx.logger.warning(
"⚠️ Progress bar didn't receive 100 percent but all expected node-progress-types are in place: %s ⚠️", # https://github.com/ITISFoundation/osparc-simcore/issues/6449
waiter.get_current_progress(),
)
else:
raise
with websocket.expect_event("framereceived", waiter, timeout=timeout):
if press_start_button:
_trigger_service_start(page, node_id)

yield service_running
waiter.assert_service_ready()
service_running.iframe_locator = page.frame_locator(
f'[osparc-test-id="iframe_{node_id}"]'
)
Expand All @@ -480,6 +562,7 @@ def wait_for_service_running(
press_start_button: bool,
product_url: AnyUrl,
is_service_legacy: bool,
assertion_output_folder: Path,
) -> FrameLocator:
"""NOTE: if the service was already started this will not work as some of the required websocket events will not be emitted again
In which case this will need further adjutment"""
Expand All @@ -493,10 +576,13 @@ def wait_for_service_running(
product_url=product_url,
api_request_context=page.request,
is_service_legacy=is_service_legacy,
assertion_output_folder=assertion_output_folder,
)
with websocket.expect_event("framereceived", waiter, timeout=timeout):
if press_start_button:
_trigger_service_start(page, node_id)

waiter.assert_service_ready()
return page.frame_locator(f'[osparc-test-id="iframe_{node_id}"]')


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Final, TypedDict

import arrow
Expand Down Expand Up @@ -107,6 +108,7 @@ def wait_for_launched_s4l(
copy_workspace: bool,
product_url: AnyUrl,
is_service_legacy: bool,
assertion_output_folder: Path,
) -> WaitForS4LDict:
with log_context(logging.INFO, "launch S4L") as ctx:
predicate = S4LWaitForWebsocket(logger=ctx.logger)
Expand Down Expand Up @@ -134,6 +136,7 @@ def wait_for_launched_s4l(
press_start_button=False,
product_url=product_url,
is_service_legacy=is_service_legacy,
assertion_output_folder=assertion_output_folder,
)
s4l_websocket = ws_info.value
ctx.logger.info("acquired S4L websocket!")
Expand Down
Loading
Loading