6
6
from contextlib import contextmanager
7
7
from datetime import datetime
8
8
from pathlib import Path
9
- from typing import Dict , List , Union
9
+ from typing import Dict , List , Union , Any
10
10
11
+ import aio_pika
11
12
import aiofiles
13
+ import attr
12
14
import docker
13
- import pika
14
15
from celery .utils .log import get_task_logger
15
16
from sqlalchemy import and_ , exc
16
17
17
18
from servicelib .utils import logged_gather
18
19
from simcore_sdk import node_data , node_ports
19
- from simcore_sdk .models .pipeline_models import (
20
- RUNNING ,
21
- SUCCESS ,
22
- ComputationalPipeline ,
23
- ComputationalTask ,
24
- )
20
+ from simcore_sdk .models .pipeline_models import (RUNNING , SUCCESS ,
21
+ ComputationalPipeline ,
22
+ ComputationalTask )
25
23
from simcore_sdk .node_ports import log as node_port_log
26
24
from simcore_sdk .node_ports .dbmanager import DBManager
27
25
28
26
from . import config
29
- from .utils import (
30
- DbSettings ,
31
- DockerSettings ,
32
- ExecutorSettings ,
33
- RabbitSettings ,
34
- S3Settings ,
35
- find_entry_point ,
36
- is_node_ready ,
37
- safe_channel ,
38
- )
27
+ from .rabbitmq import RabbitMQ
28
+ from .utils import (DbSettings , DockerSettings , ExecutorSettings , S3Settings ,
29
+ find_entry_point , is_node_ready , safe_channel )
39
30
40
31
log = get_task_logger (__name__ )
41
32
log .setLevel (config .SIDECAR_LOGLEVEL )
@@ -57,35 +48,17 @@ def session_scope(session_factory):
57
48
finally :
58
49
session .close ()
59
50
60
-
51
+ @ attr . s ( auto_attribs = True )
61
52
class Sidecar : # pylint: disable=too-many-instance-attributes
62
- def __init__ (self ):
63
- # publish subscribe config
64
- self ._pika = RabbitSettings ()
65
-
66
- # docker client config
67
- self ._docker = DockerSettings ()
68
-
69
- # object storage config
70
- self ._s3 = S3Settings ()
71
-
72
- # db config
73
- self ._db = DbSettings () # keeps single db engine: sidecar.utils_{id}
74
- self ._db_manager = (
75
- None # lazy init because still not configured. SEE _get_node_ports
76
- )
77
-
78
- # current task
79
- self ._task = None
80
-
81
- # current user id
82
- self ._user_id : str = None
83
-
84
- # stack name
85
- self ._stack_name : str = None
86
-
87
- # executor options
88
- self ._executor = ExecutorSettings ()
53
+ _rabbit_mq : RabbitMQ
54
+ _docker : DockerSettings = DockerSettings ()
55
+ _s3 : S3Settings = S3Settings ()
56
+ _db : DbSettings = DbSettings () # keeps single db engine: sidecar.utils_{id}
57
+ _db_manager : Any = None # lazy init because still not configured. SEE _get_node_ports
58
+ _task : ComputationalTask = None # current task
59
+ _user_id : str = None # current user id
60
+ _stack_name : str = None # stack name
61
+ _executor : ExecutorSettings = ExecutorSettings () # executor options
89
62
90
63
async def _get_node_ports (self ):
91
64
if self ._db_manager is None :
@@ -211,27 +184,51 @@ async def _post_progress(self, channel, progress):
211
184
async def log_file_processor (self , log_file : Path ) -> None :
212
185
"""checks both container logs and the log_file if any
213
186
"""
214
- try :
215
- TIME_BETWEEN_LOGS_S : int = 2
216
- time_logs_sent = time .monotonic ()
217
- accumulated_logs = []
218
- async with aiofiles .open (log_file , mode = "r" ) as fp :
219
- async for line in fp :
220
- now = time .monotonic ()
221
- accumulated_logs .append (line )
222
- if (now - time_logs_sent ) < TIME_BETWEEN_LOGS_S :
223
- continue
224
- # send logs to rabbitMQ
225
- # TODO: NEEDS to shield??
226
- with safe_channel (self ._pika ) as (channel , _ ):
227
- await self ._post_log (channel , msg = accumulated_logs )
228
- time_logs_sent = now
229
- accumulated_logs = []
230
- except asyncio .CancelledError :
231
- # the task is complete let's send the last logs
232
- if accumulated_logs :
233
- with safe_channel (self ._pika ) as (channel , _ ):
234
- await self ._post_log (channel , msg = accumulated_logs )
187
+ # async def parse_line(line: str) -> None:
188
+ # # TODO: This should be 'settings', a regex for every service
189
+ # if line.lower().startswith("[progress]"):
190
+ # progress = line.lower().lstrip(
191
+ # "[progress]").rstrip("%").strip()
192
+ # await self._post_progress(channel, progress)
193
+ # log.debug('PROGRESS %s', progress)
194
+ # elif "percent done" in line.lower():
195
+ # progress = line.lower().rstrip("percent done")
196
+ # try:
197
+ # float_progress = float(progress) / 100.0
198
+ # progress = str(float_progress)
199
+ # await self._post_progress(channel, progress)
200
+ # log.debug('PROGRESS %s', progress)
201
+ # except ValueError:
202
+ # log.exception("Could not extract progress from solver")
203
+ # else:
204
+ # # just send as log
205
+ # await self._post_log(channel, msg=line)
206
+
207
+
208
+
209
+
210
+ # try:
211
+ # import pdb; pdb.set_trace()
212
+ # TIME_BETWEEN_LOGS_S: int = 2
213
+ # time_logs_sent = time.monotonic()
214
+ # accumulated_logs = []
215
+ # async with aiofiles.open(log_file, mode="r") as fp:
216
+ # async for line in fp:
217
+ # now = time.monotonic()
218
+ # accumulated_logs.append(line)
219
+ # if (now - time_logs_sent) < TIME_BETWEEN_LOGS_S:
220
+ # continue
221
+ # # send logs to rabbitMQ
222
+ # # TODO: NEEDS to shield??
223
+ # with safe_channel(self._pika) as (channel, _):
224
+ # await self._post_log(channel, msg=accumulated_logs)
225
+ # time_logs_sent = now
226
+ # accumulated_logs = []
227
+ # except asyncio.CancelledError:
228
+ # # the task is complete let's send the last logs
229
+ # if accumulated_logs:
230
+ # with safe_channel(self._pika) as (channel, _):
231
+ # await self._post_log(channel, msg=accumulated_logs)
235
232
236
233
# async def _bg_job(self, log_file):
237
234
# log.debug('Bck job started %s:node %s:internal id %s from container',
@@ -568,7 +565,7 @@ async def postprocess(self):
568
565
finally :
569
566
_session .close ()
570
567
571
- async def inspect (self , celery_task , user_id : str , project_id : str , node_id : str ):
568
+ async def inspect (self , job_request_id : int , user_id : str , project_id : str , node_id : str ):
572
569
log .debug (
573
570
"ENTERING inspect with user %s pipeline:node %s: %s" ,
574
571
user_id ,
@@ -657,4 +654,3 @@ async def inspect(self, celery_task, user_id: str, project_id: str, node_id: str
657
654
SIDECAR = Sidecar ()
658
655
659
656
__all__ = ["SIDECAR" ]
660
-
0 commit comments