Skip to content

[wrapper.py] ジョブが完了するまで待つメソッド wait_for_completion を追加 #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion annofabapi/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.15.2'
__version__ = '0.15.3'
48 changes: 48 additions & 0 deletions annofabapi/wrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import logging
import mimetypes
import time
import urllib
import urllib.parse
from typing import Any, Callable, Dict, List, Optional, Tuple # pylint: disable=unused-import
Expand Down Expand Up @@ -789,3 +790,50 @@ def get_all_project_job(self, project_id: str, query_params: Dict[str, Any]) ->
r = self.api.get_project_job(project_id, query_params=copied_params)[0]
all_jobs.extend(r["list"])
return all_jobs

def wait_for_completion(self, project_id: str, job_type: str, job_access_interval: int = 60,
max_job_access: int = 10) -> bool:
"""
ジョブが完了するまで待つ。

Args:
project_id: プロジェクトID
job_type: 取得するジョブ種別
job_access_interval: ジョブにアクセスする間隔[sec]
max_job_access: ジョブに最大何回アクセスするか

Returns:
True: ジョブが成功した or 実行中のジョブがない。
False: ジョブが失敗 or ``max_job_access`` 回アクセスしても、ジョブが完了しなかった。

"""
def get_latest_job():
job_list = self.api.get_project_job(project_id, query_params={"type": job_type})[0]["list"]
assert len(job_list) == 1
return job_list[0]

job_access_count = 0
while True:
job = get_latest_job()
if job_access_count == 0 and job["job_status"] != "progress":
logger.debug(f"進行中のジョブはありませんでした。")
return True

job_access_count += 1

if job["job_status"] == "succeeded":
logger.debug(f"job_id = {job['job_id']} のジョブが成功しました。")
return True

elif job["job_status"] == "failed":
logger.info(f"job_id = {job['job_id']} のジョブが失敗しました。")
return False

else:
# 進行中
if job_access_count < max_job_access:
logger.debug(f"job_id = {job['job_id']} のジョブが進行中です。{job_access_interval} 秒間待ちます。")
time.sleep(job_access_interval)
else:
logger.debug(f"job_id = {job['job_id']} のジョブに {job_access_interval} 回アクセスしましたが、完了しませんでした。")
return False
15 changes: 10 additions & 5 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,17 @@ def test_instruction():
api.delete_instruction_image(project_id, test_image_id)


def test_job():
print("get_project_job in wrapper.get_all_project_job")
assert len(wrapper.get_all_project_job(project_id, {"type": "gen-inputs"})) >= 0
class TestJob:
def test_wait_for_completion(self):
# 実行中のジョブはないので、必ずTrue
result = wrapper.wait_for_completion(project_id, "gen-tasks", job_access_interval=1, max_job_access=1)
assert result == True

print("wrapper.delete_all_succeeded_job")
assert len(wrapper.delete_all_succeeded_job(project_id, "gen-tasks")) >= 0
def test_get_all_project_job(self):
assert len(wrapper.get_all_project_job(project_id, {"type": "gen-inputs"})) >= 0

def test_delete_all_succeeded_job(self):
assert len(wrapper.delete_all_succeeded_job(project_id, "gen-tasks")) >= 0


def test_webhook():
Expand Down