Skip to content

Fix garbage collection #1480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions services/web/client/source/class/osparc/io/WatchDog.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,56 @@ qx.Class.define("osparc.io.WatchDog", {

type : "singleton",

construct: function() {
this.__clientHeartbeatPinger = new qx.event.Timer(this.heartbeatInterval);
this.__clientHeartbeatPinger.addListener("interval", function() {
const socket = osparc.wrapper.WebSocket.getInstance();
try {
socket.emit("client_heartbeat");
} catch (error) {
// no need to handle the error, nor does it need to cause further issues
// it is ok to eat it up
}
}, this);

// register for socket.io event to change the default heartbeat interval
const socket = osparc.wrapper.WebSocket.getInstance();
const socketIoEventName = "set_heartbeat_emit_interval";
socket.removeSlot(socketIoEventName);
socket.on(socketIoEventName, function(emitIntervalSeconds) {
this.setHeartbeatInterval(parseInt(emitIntervalSeconds) * 1000);
}, this);
},

properties: {
onLine: {
check: "Boolean",
init: false,
nullable: false,
apply: "_applyOnLine"
},

heartbeatInterval: {
check: "Number",
init: 2 * 1000, // in milliseconds
nullable: false,
apply: "_applyHeartbeatInterval"
}
},

members: {
__clientHeartbeatPinger: null,

_applyOnLine: function(value) {
let logo = osparc.component.widget.LogoOnOff.getInstance();
if (logo) {
logo.setOnLine(value);
}
value ? this.__clientHeartbeatPinger.start() : this.__clientHeartbeatPinger.stop();
},

_applyHeartbeatInterval: function(value) {
this.__clientHeartbeatPinger.setInterval(value);
}
} // members
});
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def start_project_interactive_services(


async def delete_project(request: web.Request, project_uuid: str, user_id: int) -> None:
await delete_project_from_db(request, project_uuid, user_id)
await delete_project_from_db(request.app, project_uuid, user_id)

async def remove_services_and_data():
await remove_project_interactive_services(user_id, project_uuid, request.app)
Expand Down Expand Up @@ -173,13 +173,13 @@ async def delete_project_data(


async def delete_project_from_db(
request: web.Request, project_uuid: str, user_id: int
app: web.Application, project_uuid: str, user_id: int
) -> None:
db = request.config_dict[APP_PROJECT_DBAPI]
await delete_pipeline_db(request.app, project_uuid)
db = app[APP_PROJECT_DBAPI]
await delete_pipeline_db(app, project_uuid)
await db.delete_user_project(user_id, project_uuid)
# requests storage to delete all project's stored data
await delete_data_folders_of_project(request.app, project_uuid, user_id)
await delete_data_folders_of_project(app, project_uuid, user_id)


async def add_project_node(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

from .config import APP_GARBAGE_COLLECTOR_KEY, get_garbage_collector_interval
from .registry import RedisResourceRegistry, get_registry
from simcore_service_webserver.projects.projects_api import delete_project_from_db
from simcore_service_webserver.users_api import is_user_guest, delete_user
from simcore_service_webserver.projects.projects_exceptions import ProjectNotFoundError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,6 +72,29 @@ async def collect_garbage(registry: RedisResourceRegistry, app: web.Application)
app=app,
)

await remove_resources_if_guest_user(
app=app, project_uuid=resource_value, user_id=int(key["user_id"])
)


async def remove_resources_if_guest_user(
app: web.Application, project_uuid: str, user_id: int
) -> None:
"""When a guest user finishes using the platform its Posgtres
and S3/MinIO entries need to be removed
"""
logger.debug(
"Removing project '%s' from the database", project_uuid,
)
try:
await delete_project_from_db(app, project_uuid, user_id)
except ProjectNotFoundError:
logging.warning("Project '%s' not found, skipping removal", project_uuid)

logger.debug("Will try to remove resources for user '%s' if GUEST", user_id)
if await is_user_guest(app, user_id):
await delete_user(app, user_id)


async def garbage_collector_task(app: web.Application):
logger.info("Starting garbage collector...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import asyncio
import logging
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Any

from aiohttp import web

Expand All @@ -17,9 +17,11 @@
from socketio.exceptions import ConnectionRefusedError as SocketIOConnectionError

from ..login.decorators import RQT_USERID_KEY, login_required
from ..resource_manager.websocket_manager import managed_resource
from ..resource_manager.websocket_manager import managed_resource, get_registry
from ..resource_manager.config import get_service_deletion_timeout
from .config import get_socket_server
from .handlers_utils import register_socketio_handler
from .events import post_messages

ANONYMOUS_USER_ID = -1
_SOCKET_IO_AIOHTTP_REQUEST_KEY = "aiohttp.request"
Expand Down Expand Up @@ -48,6 +50,15 @@ async def connect(sid: str, environ: Dict, app: web.Application) -> bool:
except Exception as exc: # pylint: disable=broad-except
raise SocketIOConnectionError(f"Unexpected error: {exc}")

# Send service_deletion_timeout to client
# the interval should be < get_service_deletion_timeout(app) to avoid
# issues, assuming half of the interval and not less the 2 seconds
emit_interval: int = max(2, get_service_deletion_timeout(app) // 2)
log.info("Sending set_heartbeat_emit_interval with %s", emit_interval)

user_id = request.get(RQT_USERID_KEY, ANONYMOUS_USER_ID)
await post_messages(app, user_id, {'set_heartbeat_emit_interval': emit_interval})

return True


Expand Down Expand Up @@ -135,3 +146,22 @@ async def disconnect(sid: str, app: web.Application) -> None:
sid,
str(socketio_session),
)

@register_socketio_handler
async def client_heartbeat(sid: str, _: Any, app: web.Application) -> None:
"""JS client invokes this handler to signal its presence.

Each time this event is received the alive key's TTL is updated in
Redis. Once the key expires, resources will be garbage collected.

Arguments:
sid {str} -- the socket ID
_ {Any} -- the data is ignored for this handler
app {web.Application} -- the aiohttp app
"""
sio = get_socket_server(app)
async with sio.session(sid) as socketio_session:
registry = get_registry(app)
await registry.set_key_alive(
socketio_session, False, get_service_deletion_timeout(app)
)
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async def access_study(request: web.Request) -> web.Response:
raise RuntimeError("Unable to start user session")

log.debug(
"Granted access to study '%d' for user %s. Copying study over ...",
"Granted access to study '%s' for user %s. Copying study over ...",
template_project.get("name"),
user.get("email"),
)
Expand Down
31 changes: 31 additions & 0 deletions services/web/server/src/simcore_service_webserver/users_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import logging

from aiohttp import web
from simcore_postgres_database.models.users import UserRole
from simcore_service_webserver.login.cfg import get_storage

logger = logging.getLogger(__name__)


async def is_user_guest(app: web.Application, user_id: int) -> bool:
"""Returns True if the user exists and is a GUEST"""
db = get_storage(app)
user = await db.get_user({"id": user_id})
if not user:
logger.warning("Could not find user with id '%s'", user_id)
return False

return UserRole(user["role"]) == UserRole.GUEST


async def delete_user(app: web.Application, user_id: int) -> None:
"""Deletes a user from the database if the user exists"""
db = get_storage(app)
user = await db.get_user({"id": user_id})
if not user:
logger.warning(
"User with id '%s' could not be deleted because it does not exist", user_id
)
return

await db.delete_user(user)
9 changes: 9 additions & 0 deletions services/web/server/tests/unit/with_dbs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,12 @@ async def create(user_id, project_id) -> Dict:
return running_service_dict

return create

@pytest.fixture
def asyncpg_storage_system_mock(mocker):
mocked_method = mocker.patch(
"simcore_service_webserver.login.storage.AsyncpgStorage.delete_user",
return_value=Future(),
)
mocked_method.return_value.set_result("")
return mocked_method
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def client(loop, aiohttp_client, app_cfg, postgres_service):
)
)


@pytest.fixture()
async def logged_user(client, user_role: UserRole):
""" adds a user in db and logs in with client
Expand Down Expand Up @@ -322,6 +321,7 @@ async def test_interactive_services_removed_after_logout(
mocked_dynamic_service,
client_session_id,
socketio_client,
storage_subsystem_mock, # when guest user logs out garbage is collected
):
set_service_deletion_delay(SERVICE_DELETION_DELAY, client.server.app)
# login - logged_user fixture
Expand Down Expand Up @@ -365,6 +365,7 @@ async def test_interactive_services_remain_after_websocket_reconnection_from_2_t
mocked_dynamic_service,
socketio_client,
client_session_id,
storage_subsystem_mock, # when guest user logs out garbage is collected
):

set_service_deletion_delay(SERVICE_DELETION_DELAY, client.server.app)
Expand Down Expand Up @@ -435,6 +436,8 @@ async def test_interactive_services_removed_per_project(
mocked_dynamic_service,
socketio_client,
client_session_id,
asyncpg_storage_system_mock,
storage_subsystem_mock, # when guest user logs out garbage is collected
):
set_service_deletion_delay(SERVICE_DELETION_DELAY, client.server.app)
# create server with delay set to DELAY
Expand Down