Skip to content

reduce traefik latency check, and remove unnecessary warnings when a pipeline does not exist #2224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 23, 2021
Merged
8 changes: 4 additions & 4 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ services:
- traefik.http.middlewares.${SWARM_STACK_NAME_NO_HYPHEN}_sslheader.headers.customrequestheaders.X-Forwarded-Proto=http
- traefik.enable=true
- traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.server.port=8080
- traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.healthcheck.path=/v0/health
- traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.healthcheck.path=/v0/
- traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.healthcheck.interval=1000ms
- traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.healthcheck.timeout=75ms
- traefik.http.middlewares.${SWARM_STACK_NAME}_webserver_retry.retry.attempts=3
- traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.healthcheck.timeout=100ms
- traefik.http.middlewares.${SWARM_STACK_NAME}_webserver_retry.retry.attempts=2
- traefik.http.routers.${SWARM_STACK_NAME}_webserver.rule=hostregexp(`{host:.+}`)
- traefik.http.routers.${SWARM_STACK_NAME}_webserver.entrypoints=http
- traefik.http.routers.${SWARM_STACK_NAME}_webserver.priority=1
Expand Down Expand Up @@ -331,7 +331,7 @@ services:
- "--ping=true"
- "--entryPoints.ping.address=:9082"
- "--ping.entryPoint=ping"
- "--log.level=WARNING"
- "--log.level=INFO"
- "--accesslog=false"
- "--metrics.prometheus=true"
- "--metrics.prometheus.addEntryPointsLabels=true"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ async def welcome_banner(_app: web.Application):

app.on_startup.append(welcome_banner)

web.run_app(app, host=config["main"]["host"], port=config["main"]["port"])
web.run_app(
app,
host=config["main"]["host"],
port=config["main"]["port"],
access_log_format='%a %t "%r" %s %b [%Dus] "%{Referer}i" "%{User-Agent}i"',
)


__all__ = ("create_application", "run_service")
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ async def get_computation_task(
task_out = ComputationTask.parse_obj(computation_task_out_dict)
return task_out
except _DirectorServiceError as exc:
if exc.status == web.HTTPNotFound.status_code:
# the pipeline might not exist and that is ok
return
log.warning("getting pipeline for project %s failed: %s.", project_id, exc)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
- Shall be used as entry point for all the queries to the database regarding projects

"""

import asyncio
import logging
import textwrap
import uuid as uuidlib
Expand All @@ -22,6 +22,8 @@
from aiopg.sa.connection import SAConnection
from aiopg.sa.result import ResultProxy, RowProxy
from change_case import ChangeCase
from jsonschema.exceptions import ValidationError
from models_library.projects import ProjectAtDB
from servicelib.application_keys import APP_DB_ENGINE_KEY
from simcore_postgres_database.webserver_models import ProjectType, projects
from sqlalchemy import literal_column
Expand All @@ -37,6 +39,7 @@
ProjectsException,
)
from .projects_fakes import Fake
from .projects_utils import project_uses_available_services

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -307,7 +310,11 @@ async def add_project(
prj["tags"] = []
return prj

async def load_user_projects(self, user_id: int) -> List[Dict]:
async def load_user_projects(
self,
user_id: int,
filter_by_services: Optional[List[Dict]] = None,
) -> List[Dict]:
log.info("Loading projects for user %s", user_id)
async with self.engine.acquire() as conn:
user_groups: List[RowProxy] = await self.__load_user_groups(conn, user_id)
Expand All @@ -330,13 +337,17 @@ async def load_user_projects(self, user_id: int) -> List[Dict]:
"""
)
projects_list = await self.__load_projects(
conn, query, user_id, user_groups
conn, query, user_id, user_groups, filter_by_services=filter_by_services
)

return projects_list

async def load_template_projects(
self, user_id: int, *, only_published=False
self,
user_id: int,
*,
only_published=False,
filter_by_services: Optional[List[Dict]] = None,
) -> List[Dict]:
log.info("Loading public template projects")

Expand All @@ -358,7 +369,9 @@ async def load_template_projects(
"""
)

db_projects = await self.__load_projects(conn, query, user_id, user_groups)
db_projects = await self.__load_projects(
conn, query, user_id, user_groups, filter_by_services
)

projects_list.extend(db_projects)

Expand All @@ -378,7 +391,12 @@ async def __load_user_groups(
return user_groups

async def __load_projects(
self, conn: SAConnection, query: str, user_id: int, user_groups: List[RowProxy]
self,
conn: SAConnection,
query: str,
user_id: int,
user_groups: List[RowProxy],
filter_by_services: Optional[List[Dict]] = None,
) -> List[Dict]:
api_projects: List[Dict] = [] # API model-compatible projects
db_projects: List[Dict] = [] # DB model-compatible projects
Expand All @@ -387,8 +405,21 @@ async def __load_projects(
_check_project_permissions(row, user_id, user_groups, "read")
except ProjectInvalidRightsError:
continue
try:
import concurrent.futures

with concurrent.futures.ThreadPoolExecutor() as pool:
await asyncio.get_event_loop().run_in_executor(
pool, ProjectAtDB.from_orm, row
)
# ProjectAtDB.from_orm(row)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MNIOR: remove commented

Q: does it really make a difference making it async? Did you measure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still getting "some" 503 when the system more so I took no chance.

except ValidationError as exc:
log.warning("project failed validation: %s", exc)
continue
prj = dict(row.items())
log.debug("found project: %s", pformat(prj))
if filter_by_services:
if not await project_uses_available_services(prj, filter_by_services):
continue
db_projects.append(prj)

# NOTE: DO NOT nest _get_tags_by_project in async loop above !!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,24 @@ async def set_all_project_states(projects: List[Dict[str, Any]], is_template: bo
reraise=True,
)

user_available_services: List[
Dict
] = await catalog.get_services_for_user_in_product(
request.app, user_id, product_name, only_key_versions=True
)

projects_list = []
if ptype in ("template", "all"):
template_projects = await db.load_template_projects(user_id=user_id)
template_projects = await db.load_template_projects(
user_id=user_id, filter_by_services=user_available_services
)
await set_all_project_states(template_projects, is_template=True)
projects_list += template_projects

if ptype in ("user", "all"): # standard only (notice that templates will only)
user_projects = await db.load_user_projects(user_id=user_id)
user_projects = await db.load_user_projects(
user_id=user_id, filter_by_services=user_available_services
)
await set_all_project_states(user_projects, is_template=False)
projects_list += user_projects

Expand All @@ -158,32 +168,7 @@ async def set_all_project_states(projects: List[Dict[str, Any]], is_template: bo

stop = min(start + count, len(projects_list))
projects_list = projects_list[start:stop]
user_available_services: List[
Dict
] = await catalog.get_services_for_user_in_product(
request.app, user_id, product_name, only_key_versions=True
)

# validate response
async def validate_project(prj: Dict[str, Any]) -> Dict[str, Any]:
try:
projects_api.validate_project(request.app, prj)
if await project_uses_available_services(prj, user_available_services):
return prj
except ValidationError:
log.warning(
"Invalid project with id='%s' in database."
"Skipping project from listed response."
"RECOMMENDED db data diagnose and cleanup",
prj.get("uuid", "undefined"),
)

validation_tasks = [validate_project(project) for project in projects_list]
# FIXME: if some invalid, then it should not reraise but instead
results = await logged_gather(*validation_tasks, reraise=True)
validated_projects = [r for r in results if r]

return {"data": validated_projects}
return {"data": projects_list}


@login_required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pathlib import Path
from pprint import pprint
from typing import Dict, List, Optional, Union
from uuid import uuid4

import pytest
import sqlalchemy as sa
Expand Down Expand Up @@ -265,10 +266,12 @@ async def test_workflow(
modified_project = deepcopy(projects[0])
modified_project["name"] = "some other name"
modified_project["description"] = "John Raynor killed Kerrigan"
modified_project["workbench"]["ReNamed"] = modified_project["workbench"].pop(

new_node_id = str(uuid4())
modified_project["workbench"][new_node_id] = modified_project["workbench"].pop(
list(modified_project["workbench"].keys())[0]
)
modified_project["workbench"]["ReNamed"]["position"]["x"] = 0
modified_project["workbench"][new_node_id]["position"]["x"] = 0
# share with some group
modified_project["accessRights"].update(
{str(standard_groups[0]["gid"]): {"read": True, "write": True, "delete": False}}
Expand Down