12
12
from aiodocker .containers import DockerContainer
13
13
from aiodocker .exceptions import DockerContainerError , DockerError
14
14
from packaging import version
15
- from tenacity import retry , stop_after_attempt
15
+ from tenacity import after_log , retry , stop_after_attempt
16
16
17
17
from celery .utils .log import get_task_logger
18
18
from servicelib .utils import fire_and_forget_task , logged_gather
@@ -117,7 +117,9 @@ async def preprocess(self):
117
117
self .shared_folders .create ()
118
118
host_name = config .SIDECAR_HOST_HOSTNAME_PATH .read_text ()
119
119
await self ._post_messages (LogType .LOG , f"[sidecar]Running on { host_name } " )
120
- results = await logged_gather (self ._process_task_inputs (), self ._pull_image ())
120
+ results = await logged_gather (
121
+ self ._process_task_inputs (), self ._pull_image (), reraise = True
122
+ )
121
123
await self ._write_input_file (results [0 ])
122
124
log .debug ("Pre-Processing Pipeline DONE" )
123
125
@@ -207,7 +209,9 @@ async def _write_input_file(self, inputs: Dict) -> None:
207
209
file_name .write_text (json .dumps (inputs ))
208
210
log .debug ("Writing input file DONE" )
209
211
210
- @retry (reraise = True , stop = stop_after_attempt (3 ))
212
+ @retry (
213
+ reraise = True , stop = stop_after_attempt (3 ), after = after_log (log , logging .WARNING )
214
+ )
211
215
async def _pull_image (self ):
212
216
docker_image = f"{ config .DOCKER_REGISTRY } /{ self .task .image ['name' ]} :{ self .task .image ['tag' ]} "
213
217
log .debug (
@@ -216,36 +220,33 @@ async def _pull_image(self):
216
220
config .DOCKER_USER ,
217
221
config .DOCKER_PASSWORD ,
218
222
)
219
- try :
220
- async with Docker () as docker_client :
221
- await self ._post_messages (
222
- LogType .LOG ,
223
- f"[sidecar]Pulling { self .task .image ['name' ]} :{ self .task .image ['tag' ]} ..." ,
224
- )
225
- await docker_client .images .pull (
226
- docker_image ,
227
- auth = {
228
- "username" : config .DOCKER_USER ,
229
- "password" : config .DOCKER_PASSWORD ,
230
- },
231
- )
232
223
233
- # get integration version
234
- image_cfg = await docker_client .images .inspect (docker_image )
235
- # NOTE: old services did not have that label
236
- if "io.simcore.integration-version" in image_cfg ["Config" ]["Labels" ]:
237
- self .integration_version = version .parse (
238
- json .loads (
239
- image_cfg ["Config" ]["Labels" ][
240
- "io.simcore.integration-version"
241
- ]
242
- )["integration-version" ]
243
- )
224
+ async with Docker () as docker_client :
225
+ await self ._post_messages (
226
+ LogType .LOG ,
227
+ f"[sidecar]Pulling { self .task .image ['name' ]} :{ self .task .image ['tag' ]} ..." ,
228
+ )
229
+
230
+ await docker_client .images .pull (
231
+ docker_image ,
232
+ auth = {
233
+ "username" : config .DOCKER_USER ,
234
+ "password" : config .DOCKER_PASSWORD ,
235
+ },
236
+ )
237
+
238
+ # get integration version
239
+ image_cfg = await docker_client .images .inspect (docker_image )
240
+ # NOTE: old services did not have that label
241
+ if "io.simcore.integration-version" in image_cfg ["Config" ]["Labels" ]:
242
+ self .integration_version = version .parse (
243
+ json .loads (
244
+ image_cfg ["Config" ]["Labels" ][
245
+ "io.simcore.integration-version"
246
+ ]
247
+ )["integration-version" ]
248
+ )
244
249
245
- except DockerError :
246
- msg = f"Failed to pull image '{ docker_image } '"
247
- log .exception (msg )
248
- raise
249
250
250
251
async def _create_container_config (self , docker_image : str ) -> Dict :
251
252
# NOTE: Env/Binds for log folder is only necessary for integraion "0"
0 commit comments