Skip to content

PersonalAccessTokenに対応しました #679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 11, 2024
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ password = "YYYYYY"
service = build(user_id, password)
```

### PersonalAccessTokenをコンストラクタ引数に渡す場合

```python
# APIアクセス用のインスタンスを生成
from annofabapi import build


pat = "XXXXXX"

service = build(pat = pat)
```


### `.netrc`に認証情報を記載する場合
`.netrc`ファイルに、AnnofabのユーザIDとパスワードを記載します。

Expand All @@ -91,7 +104,13 @@ service = build_from_netrc()


### 環境変数に認証情報を設定する場合
環境変数`ANNOFAB_USER_ID`、`ANNOFAB_PASSWORD`にユーザIDとパスワードを設定します。


* IDとパスワードで認証する場合
* 環境変数`ANNOFAB_USER_ID`、`ANNOFAB_PASSWORD`にユーザIDとパスワードを設定します。
* パーソナルアクセストークンで認証する場合
* 環境変数`ANNOFAB_PAT`にトークンを設定します。
* `ANNOFAB_PAT`が設定されている場合、`ANNOFAB_USER_ID`、`ANNOFAB_PASSWORD`は無視されます。

```python
from annofabapi import build_from_env
Expand All @@ -109,6 +128,8 @@ service = build()

優先順位は以下の通りです。
1. 環境変数
1. `ANNOFAB_PAT`
2. `ANNOFAB_USER_ID`及び`ANNOFAB_PASSWORD`
2. `.netrc`


Expand Down
104 changes: 73 additions & 31 deletions annofabapi/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import time
from functools import wraps
from json import JSONDecodeError
from typing import Any, Callable, Collection, Dict, Optional, Tuple
from typing import Any, Callable, Collection, Dict, Optional, Tuple, Union

import backoff
import requests
from requests.auth import AuthBase
from requests.cookies import RequestsCookieJar

from annofabapi.credentials import IdPass, Pat, Tokens
from annofabapi.exceptions import InvalidMfaCodeError, MfaEnabledUserExecutionError, NotLoggedInError
from annofabapi.generated_api import AbstractAnnofabApi
from annofabapi.util.type_util import assert_noreturn

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -234,34 +236,33 @@ class AnnofabApi(AbstractAnnofabApi):
Web APIに対応したメソッドが存在するクラス。

Args:
login_user_id: AnnofabにログインするときのユーザID
login_password: Annofabにログインするときのパスワード
credentials: Annofabにログインするときの認証情報
endpoint_url: Annofab APIのエンドポイント。
input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか
input_mfa_code_via_stdin: MFAコードを標準入力から入力するかどうか Falseの時にMFAコードの入力を求められた場合は例外を送出する

Attributes:
token_dict: login, refresh_tokenで取得したtoken情報
tokens: login, refresh_tokenで取得したtoken情報
cookies: Signed Cookie情報
"""

def __init__(
self, login_user_id: str, login_password: str, *, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False
) -> None:
if not login_user_id or not login_password:
def __init__(self, credentials: Union[IdPass, Pat], *, endpoint_url: str = DEFAULT_ENDPOINT_URL, input_mfa_code_via_stdin: bool = False) -> None:
if isinstance(credentials, IdPass) and (not credentials.user_id or not credentials.password):
raise ValueError("login_user_id or login_password is empty.")
if isinstance(credentials, Pat) and not credentials.token:
raise ValueError("pat is empty.")

self.login_user_id = login_user_id
self.login_password = login_password
self.credentials = credentials
self.endpoint_url = endpoint_url
self.input_mfa_code_via_stdin = input_mfa_code_via_stdin
self.url_prefix = f"{endpoint_url}/api/v1"
self.session = requests.Session()

self.token_dict: Optional[Dict[str, Any]] = None
self.tokens: Union[Tokens, Pat, None] = None
Copy link
Collaborator

@yuji38kwmt yuji38kwmt Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

興味による質問です。
self.tokensの型はUnion[HasAuthToken, None]でもよいように思ったのですが、Union[Tokens, Pat, None]の方がよいのでしょうか?

Copy link
Contributor Author

@seraphr seraphr Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

その定義だと、以下の部分で型チェックが通りません。
class HasAuthTokenは、別の場所で継承が可能であり、PatでもNoneでも無いことがわかっていても、Tokensであるとは限らないからです。
class HasAuthTokenは、PatTokensに定義されているauth_tokenという関数が、同じ意味であることを示すためのマーカーとして定義していて、self.tokensの型として使うことを想定していません。

if isinstance(self.tokens, Pat):
self.tokens = None
return
request_body = self.tokens.to_dict()

if isinstance(self.tokens, Pat):
return
request_body = {"refresh_token": self.tokens.refresh_token}


Union[HasAuthToken, None]みたいな定義はオブジェクト指向的なサブタイプポリモフィズムを想定していて、Union[Tokens, Pat, None]みたいな定義は代数的データ型を想定しています。
それぞれの利点・欠点は、そのままオブジェクト指向的なデータ定義と代数的データ型の利点・欠点を引き継ぎます(cf. Expression Problem

オブジェクト指向的なサブタイプポリモフィズムで十全に使えるように、PatTokens(及び IdPass)を定義するのは、難しいとは言いませんが、その構造が今のapi.pyの構造から乖離するので面倒です。
loginlogoutrefreshの責務自体を(そうなったら名前は変わりますが)HasAuthTokenに持たせなければいけなくなるでしょう(PatTokensの処理の大きな違いはその辺りであり、その差を吸収する必要があるため)。
また、その場合self.credentialsを含めて抽象化する必要があるでしょう。

今後近いうちに、パーソナルアクセストークンとパスワード認証に加えて新しい認証方法が追加されることが想定されている場合は、オブジェクト指向的に定義しておくと後の手間が省けます。
が、現状そういうことは想定していないので、実装の手間が低い方を選んでいます。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ありがとうございます。理解できました!


self.cookies: Optional[RequestsCookieJar] = None

self.__account_id: Optional[str] = None
self.__user_id: Optional[str] = None

class _MyToken(AuthBase):
"""
Expand Down Expand Up @@ -328,8 +329,9 @@ def _create_kwargs(
"params": new_params,
"headers": headers,
}
if self.token_dict is not None:
kwargs.update({"auth": self._MyToken(self.token_dict["id_token"])})
if self.tokens is not None:
token = self.tokens.auth_token
kwargs.update({"auth": self._MyToken(token)})

if request_body is not None:
if isinstance(request_body, (dict, list)):
Expand Down Expand Up @@ -495,6 +497,12 @@ def _request_wrapper(
else:
url = f"{self.url_prefix}{url_path}"

# patを使う場合は最初にtokensをセットする
# def logoutの呼び出しでtokensがNoneになった後にAPIを呼び出しても問題ないように(IdPassの場合も、自動loginしているので、その代わり)
# IdPassと同じ処理に合流させてしまうと、patが無効なときに無限ループしてしまうので、ここで1回だけ呼び出す
if self.tokens is None and isinstance(self.credentials, Pat):
self._login_pat(self.credentials)

kwargs = self._create_kwargs(query_params, header_params, request_body)
response = self.session.request(method=http_method.lower(), url=url, **kwargs)
# response.requestよりメソッド引数のrequest情報の方が分かりやすいので、メソッド引数のrequest情報を出力する。
Expand All @@ -512,8 +520,8 @@ def _request_wrapper(
},
)

# Unauthorized Errorならば、ログイン後に再度実行する
if response.status_code == requests.codes.unauthorized:
# ID/PASSが指定されており、Unauthorized Errorならば、ログイン後に再度実行する
if isinstance(self.credentials, IdPass) and response.status_code == requests.codes.unauthorized:
self.refresh_token()
return self._request_wrapper(
http_method,
Expand Down Expand Up @@ -616,7 +624,7 @@ def _request_get_with_cookie(self, project_id: str, url: str) -> requests.Respon
#########################################
# Public Method : Login
#########################################
def _login_respond_to_auth_challenge(self, mfa_code: str, session: str) -> Dict[str, Any]:
def _login_respond_to_auth_challenge(self, id_pass: IdPass, mfa_code: str, session: str) -> Dict[str, Any]:
"""
MFAコードによるログインを実行します。

Expand All @@ -629,7 +637,7 @@ def _login_respond_to_auth_challenge(self, mfa_code: str, session: str) -> Dict[
Raises:
InvalidMfaCodeError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が正しくない場合
"""
request_body = {"user_id": self.login_user_id, "mfa_code": mfa_code, "session": session}
request_body = {"user_id": id_pass.user_id, "mfa_code": mfa_code, "session": session}
url = f"{self.url_prefix}/login-respond-to-auth-challenge"

response = self._execute_http_request("post", url, json=request_body, raise_for_status=False)
Expand All @@ -645,7 +653,7 @@ def _login_respond_to_auth_challenge(self, mfa_code: str, session: str) -> Dict[
if self.input_mfa_code_via_stdin:
logger.info(new_error_message)
new_mfa_code = _read_mfa_code_from_stdin()
return self._login_respond_to_auth_challenge(new_mfa_code, session)
return self._login_respond_to_auth_challenge(id_pass, new_mfa_code, session)
else:
raise InvalidMfaCodeError(new_error_message)

Expand All @@ -671,7 +679,15 @@ def login(self, mfa_code: Optional[str] = None) -> None:
InvalidMfaCodeError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が正しくない場合
MfaEnabledUserExecutionError: ``self.input_mfa_code_via_stdin`` が ``False`` AND ``mfa_code`` が未指定の場合
"""
login_info = {"user_id": self.login_user_id, "password": self.login_password}
if isinstance(self.credentials, IdPass):
self._login_id_pass(self.credentials, mfa_code)
elif isinstance(self.credentials, Pat):
self._login_pat(self.credentials)
else:
assert_noreturn(self.credentials)

def _login_id_pass(self, id_pass: IdPass, mfa_code: Optional[str] = None) -> None:
login_info = {"user_id": id_pass.user_id, "password": id_pass.password}

url = f"{self.url_prefix}/login"

Expand All @@ -683,21 +699,24 @@ def login(self, mfa_code: Optional[str] = None) -> None:
if self.input_mfa_code_via_stdin:
mfa_code = _read_mfa_code_from_stdin()
else:
raise MfaEnabledUserExecutionError(self.login_user_id)
raise MfaEnabledUserExecutionError(id_pass.user_id)

mfa_json_obj = self._login_respond_to_auth_challenge(mfa_code, login_json_obj["session"])
mfa_json_obj = self._login_respond_to_auth_challenge(id_pass, mfa_code, login_json_obj["session"])
token_dict = mfa_json_obj["token"]
else:
# `login` APIのレスポンスのスキーマがloginRespondToAuthChallengeのとき
token_dict = login_json_obj["token"]

self.token_dict = token_dict
logger.debug("Logged in successfully. user_id = %s", self.login_user_id)
self.tokens = Tokens.from_dict(token_dict)
logger.debug("Logged in successfully. user_id = %s", id_pass.user_id)

def _login_pat(self, pat: Pat) -> None:
self.tokens = pat

def logout(self) -> None:
"""
ログアウトします。
ログアウト後は、インスタンス変数 ``token_dict`` をNoneにします。
ログアウト後は、インスタンス変数 ``tokens`` をNoneにします。



Expand All @@ -708,27 +727,33 @@ def logout(self) -> None:
NotLoggedInError: ログインしてない状態で関数を呼び出したときのエラー
"""

if self.token_dict is None:
if self.tokens is None:
raise NotLoggedInError
if isinstance(self.tokens, Pat):
self.tokens = None
return

request_body = self.token_dict
request_body = self.tokens.to_dict()
url = f"{self.url_prefix}/logout"
self._execute_http_request("POST", url, json=request_body)
self.token_dict = None
self.tokens = None

def refresh_token(self) -> None:
"""
トークンを再発行して、新しいトークン情報をインスタンスに保持します。
パーソナルアクセストークンでのアクセスをしている場合はrefreshを行いません。
ログインしていない場合やリフレッシュトークンの有効期限が切れている場合は、login APIを実行して新しいトークン情報をインスタンスに保持します。

"""

if self.token_dict is None:
if self.tokens is None:
# 一度もログインしていないときは、login APIを実行して、トークン情報をインスタンスに保持する(login関数内でインスタンスに保持している)
self.login()
return
if isinstance(self.tokens, Pat):
return

request_body = {"refresh_token": self.token_dict["refresh_token"]}
request_body = {"refresh_token": self.tokens.refresh_token}
url = f"{self.url_prefix}/refresh-token"
response = self._execute_http_request("POST", url, json=request_body)

Expand All @@ -737,11 +762,12 @@ def refresh_token(self) -> None:
self.login()
return

self.token_dict = response.json()
self.tokens = Tokens.from_dict(response.json())

#########################################
# Public Method : Other
#########################################

@property
def account_id(self) -> str:
"""
Expand All @@ -754,3 +780,19 @@ def account_id(self) -> str:
account_id = content["account_id"]
self.__account_id = account_id
return account_id

@property
def login_user_id(self) -> str:
"""
Annofabにログインするユーザのuser_id
"""
if self.__user_id is not None:
return self.__user_id
if isinstance(self.credentials, IdPass):
self.__user_id = self.credentials.user_id
return self.__user_id
else:
content, _ = self.get_my_account()
user_id = content["user_id"]
self.__user_id = user_id
return user_id
48 changes: 48 additions & 0 deletions annofabapi/credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from dataclasses import dataclass
from typing import Dict, Protocol


class HasAuthToken(Protocol):
@property
def auth_token(self) -> str: ...


@dataclass(frozen=True)
class IdPass:
user_id: str
password: str


@dataclass(frozen=True)
class Pat(HasAuthToken):
"""Personal Access Token"""

token: str

@property
def auth_token(self) -> str:
return f"Bearer {self.token}"


@dataclass(frozen=True)
class Tokens(HasAuthToken):
"""IdPassを元にログインしたあとに取得されるトークン情報"""

id_token: str
access_token: str
refresh_token: str

@property
def auth_token(self) -> str:
return self.id_token

def to_dict(self) -> Dict[str, str]:
return {
"id_token": self.id_token,
"access_token": self.access_token,
"refresh_token": self.refresh_token,
}

@staticmethod
def from_dict(d: Dict[str, str]) -> "Tokens":
return Tokens(id_token=d["id_token"], access_token=d["access_token"], refresh_token=d["refresh_token"])
Loading