diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..73f69e0 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/sonarlint/issuestore/index.pb b/.idea/sonarlint/issuestore/index.pb new file mode 100644 index 0000000..e69de29 diff --git a/fastapi_jwt_auth/auth_jwt.py b/fastapi_jwt_auth/auth_jwt.py index 4110bdb..de1f96a 100644 --- a/fastapi_jwt_auth/auth_jwt.py +++ b/fastapi_jwt_auth/auth_jwt.py @@ -191,7 +191,7 @@ def _create_token( secret_key, algorithm=algorithm, headers=headers - ).decode('utf-8') + ) def _has_token_in_denylist_callback(self) -> bool: """ diff --git a/fastapi_jwt_auth/config.py b/fastapi_jwt_auth/config.py index c81b50c..db53240 100644 --- a/fastapi_jwt_auth/config.py +++ b/fastapi_jwt_auth/config.py @@ -5,81 +5,167 @@ validator, StrictBool, StrictInt, - StrictStr + StrictStr, ) +from pydantic import __version__ as pydantic_version -class LoadConfig(BaseModel): - authjwt_token_location: Optional[Sequence[StrictStr]] = {'headers'} - authjwt_secret_key: Optional[StrictStr] = None - authjwt_public_key: Optional[StrictStr] = None - authjwt_private_key: Optional[StrictStr] = None - authjwt_algorithm: Optional[StrictStr] = "HS256" - authjwt_decode_algorithms: Optional[List[StrictStr]] = None - authjwt_decode_leeway: Optional[Union[StrictInt,timedelta]] = 0 - authjwt_encode_issuer: Optional[StrictStr] = None - authjwt_decode_issuer: Optional[StrictStr] = None - authjwt_decode_audience: Optional[Union[StrictStr,Sequence[StrictStr]]] = None - authjwt_denylist_enabled: Optional[StrictBool] = False - authjwt_denylist_token_checks: Optional[Sequence[StrictStr]] = {'access','refresh'} - authjwt_header_name: Optional[StrictStr] = "Authorization" - authjwt_header_type: Optional[StrictStr] = "Bearer" - authjwt_access_token_expires: Optional[Union[StrictBool,StrictInt,timedelta]] = timedelta(minutes=15) - authjwt_refresh_token_expires: Optional[Union[StrictBool,StrictInt,timedelta]] = timedelta(days=30) - # option for create cookies - authjwt_access_cookie_key: Optional[StrictStr] = "access_token_cookie" - authjwt_refresh_cookie_key: Optional[StrictStr] = "refresh_token_cookie" - authjwt_access_cookie_path: Optional[StrictStr] = "/" - authjwt_refresh_cookie_path: Optional[StrictStr] = "/" - authjwt_cookie_max_age: Optional[StrictInt] = None - authjwt_cookie_domain: Optional[StrictStr] = None - authjwt_cookie_secure: Optional[StrictBool] = False - authjwt_cookie_samesite: Optional[StrictStr] = None - # option for double submit csrf protection - authjwt_cookie_csrf_protect: Optional[StrictBool] = True - authjwt_access_csrf_cookie_key: Optional[StrictStr] = "csrf_access_token" - authjwt_refresh_csrf_cookie_key: Optional[StrictStr] = "csrf_refresh_token" - authjwt_access_csrf_cookie_path: Optional[StrictStr] = "/" - authjwt_refresh_csrf_cookie_path: Optional[StrictStr] = "/" - authjwt_access_csrf_header_name: Optional[StrictStr] = "X-CSRF-Token" - authjwt_refresh_csrf_header_name: Optional[StrictStr] = "X-CSRF-Token" - authjwt_csrf_methods: Optional[Sequence[StrictStr]] = {'POST','PUT','PATCH','DELETE'} - - @validator('authjwt_access_token_expires') - def validate_access_token_expires(cls, v): - if v is True: - raise ValueError("The 'authjwt_access_token_expires' only accept value False (bool)") - return v - - @validator('authjwt_refresh_token_expires') - def validate_refresh_token_expires(cls, v): - if v is True: - raise ValueError("The 'authjwt_refresh_token_expires' only accept value False (bool)") - return v - - @validator('authjwt_denylist_token_checks', each_item=True) - def validate_denylist_token_checks(cls, v): - if v not in ['access','refresh']: - raise ValueError("The 'authjwt_denylist_token_checks' must be between 'access' or 'refresh'") - return v - - @validator('authjwt_token_location', each_item=True) - def validate_token_location(cls, v): - if v not in ['headers','cookies']: - raise ValueError("The 'authjwt_token_location' must be between 'headers' or 'cookies'") - return v - - @validator('authjwt_cookie_samesite') - def validate_cookie_samesite(cls, v): - if v not in ['strict','lax','none']: - raise ValueError("The 'authjwt_cookie_samesite' must be between 'strict', 'lax', 'none'") - return v - - @validator('authjwt_csrf_methods', each_item=True) - def validate_csrf_methods(cls, v): - if v.upper() not in {"GET", "HEAD", "POST", "PUT", "DELETE", "PATCH"}: - raise ValueError("The 'authjwt_csrf_methods' must be between http request methods") - return v.upper() - - class Config: - min_anystr_length = 1 - anystr_strip_whitespace = True +if pydantic_version[0] == "2": + from pydantic import field_validator + +if pydantic_version[0] == "2": + class LoadConfig(BaseModel): + authjwt_token_location: Optional[Sequence[StrictStr]] = {'headers'} + authjwt_secret_key: Optional[StrictStr] = None + authjwt_public_key: Optional[StrictStr] = None + authjwt_private_key: Optional[StrictStr] = None + authjwt_algorithm: Optional[StrictStr] = "HS256" + authjwt_decode_algorithms: Optional[List[StrictStr]] = None + authjwt_decode_leeway: Optional[Union[StrictInt,timedelta]] = 0 + authjwt_encode_issuer: Optional[StrictStr] = None + authjwt_decode_issuer: Optional[StrictStr] = None + authjwt_decode_audience: Optional[Union[StrictStr,Sequence[StrictStr]]] = None + authjwt_denylist_enabled: Optional[StrictBool] = False + authjwt_denylist_token_checks: Optional[Sequence[StrictStr]] = {'access','refresh'} + authjwt_header_name: Optional[StrictStr] = "Authorization" + authjwt_header_type: Optional[StrictStr] = "Bearer" + authjwt_access_token_expires: Optional[Union[StrictBool,StrictInt,timedelta]] = timedelta(minutes=15) + authjwt_refresh_token_expires: Optional[Union[StrictBool,StrictInt,timedelta]] = timedelta(days=30) + # option for create cookies + authjwt_access_cookie_key: Optional[StrictStr] = "access_token_cookie" + authjwt_refresh_cookie_key: Optional[StrictStr] = "refresh_token_cookie" + authjwt_access_cookie_path: Optional[StrictStr] = "/" + authjwt_refresh_cookie_path: Optional[StrictStr] = "/" + authjwt_cookie_max_age: Optional[StrictInt] = None + authjwt_cookie_domain: Optional[StrictStr] = None + authjwt_cookie_secure: Optional[StrictBool] = False + authjwt_cookie_samesite: Optional[StrictStr] = None + # option for double submit csrf protection + authjwt_cookie_csrf_protect: Optional[StrictBool] = True + authjwt_access_csrf_cookie_key: Optional[StrictStr] = "csrf_access_token" + authjwt_refresh_csrf_cookie_key: Optional[StrictStr] = "csrf_refresh_token" + authjwt_access_csrf_cookie_path: Optional[StrictStr] = "/" + authjwt_refresh_csrf_cookie_path: Optional[StrictStr] = "/" + authjwt_access_csrf_header_name: Optional[StrictStr] = "X-CSRF-Token" + authjwt_refresh_csrf_header_name: Optional[StrictStr] = "X-CSRF-Token" + authjwt_csrf_methods: Optional[Sequence[StrictStr]] = {'POST','PUT','PATCH','DELETE'} + + @field_validator('authjwt_access_token_expires') + def validate_access_token_expires(cls, v): + if v is True: + raise ValueError("The 'authjwt_access_token_expires' only accept value False (bool)") + return v + + @field_validator('authjwt_refresh_token_expires') + def validate_refresh_token_expires(cls, v): + if v is True: + raise ValueError("The 'authjwt_refresh_token_expires' only accept value False (bool)") + return v + + @field_validator('authjwt_denylist_token_checks') + def validate_denylist_token_checks(cls, values): + for v in values: + if v not in ['access','refresh']: + raise ValueError("The 'authjwt_denylist_token_checks' must be between 'access' or 'refresh'") + return values + + @field_validator('authjwt_token_location') + def validate_token_location(cls, values): + for v in values: + if v not in ['headers','cookies']: + raise ValueError("The 'authjwt_token_location' must be between 'headers' or 'cookies'") + return values + + @field_validator('authjwt_cookie_samesite') + def validate_cookie_samesite(cls, v): + if v not in ['strict','lax','none']: + raise ValueError("The 'authjwt_cookie_samesite' must be between 'strict', 'lax', 'none'") + return v + + @field_validator('authjwt_csrf_methods') + def validate_csrf_methods(cls, values): + output_values = [] + for v in values: + if v.upper() not in {"GET", "HEAD", "POST", "PUT", "DELETE", "PATCH"}: + raise ValueError("The 'authjwt_csrf_methods' must be between http request methods") + output_values.append(v.upper()) + return output_values + + class Config: + str_min_length = 1 + str_strip_whitespace = True +else: + class LoadConfig(BaseModel): + authjwt_token_location: Optional[Sequence[StrictStr]] = {'headers'} + authjwt_secret_key: Optional[StrictStr] = None + authjwt_public_key: Optional[StrictStr] = None + authjwt_private_key: Optional[StrictStr] = None + authjwt_algorithm: Optional[StrictStr] = "HS256" + authjwt_decode_algorithms: Optional[List[StrictStr]] = None + authjwt_decode_leeway: Optional[Union[StrictInt, timedelta]] = 0 + authjwt_encode_issuer: Optional[StrictStr] = None + authjwt_decode_issuer: Optional[StrictStr] = None + authjwt_decode_audience: Optional[Union[StrictStr, Sequence[StrictStr]]] = None + authjwt_denylist_enabled: Optional[StrictBool] = False + authjwt_denylist_token_checks: Optional[Sequence[StrictStr]] = {'access', 'refresh'} + authjwt_header_name: Optional[StrictStr] = "Authorization" + authjwt_header_type: Optional[StrictStr] = "Bearer" + authjwt_access_token_expires: Optional[Union[StrictBool, StrictInt, timedelta]] = timedelta(minutes=15) + authjwt_refresh_token_expires: Optional[Union[StrictBool, StrictInt, timedelta]] = timedelta(days=30) + # option for create cookies + authjwt_access_cookie_key: Optional[StrictStr] = "access_token_cookie" + authjwt_refresh_cookie_key: Optional[StrictStr] = "refresh_token_cookie" + authjwt_access_cookie_path: Optional[StrictStr] = "/" + authjwt_refresh_cookie_path: Optional[StrictStr] = "/" + authjwt_cookie_max_age: Optional[StrictInt] = None + authjwt_cookie_domain: Optional[StrictStr] = None + authjwt_cookie_secure: Optional[StrictBool] = False + authjwt_cookie_samesite: Optional[StrictStr] = None + # option for double submit csrf protection + authjwt_cookie_csrf_protect: Optional[StrictBool] = True + authjwt_access_csrf_cookie_key: Optional[StrictStr] = "csrf_access_token" + authjwt_refresh_csrf_cookie_key: Optional[StrictStr] = "csrf_refresh_token" + authjwt_access_csrf_cookie_path: Optional[StrictStr] = "/" + authjwt_refresh_csrf_cookie_path: Optional[StrictStr] = "/" + authjwt_access_csrf_header_name: Optional[StrictStr] = "X-CSRF-Token" + authjwt_refresh_csrf_header_name: Optional[StrictStr] = "X-CSRF-Token" + authjwt_csrf_methods: Optional[Sequence[StrictStr]] = {'POST', 'PUT', 'PATCH', 'DELETE'} + + @validator('authjwt_access_token_expires') + def validate_access_token_expires(cls, v): + if v is True: + raise ValueError("The 'authjwt_access_token_expires' only accept value False (bool)") + return v + + @validator('authjwt_refresh_token_expires') + def validate_refresh_token_expires(cls, v): + if v is True: + raise ValueError("The 'authjwt_refresh_token_expires' only accept value False (bool)") + return v + + @validator('authjwt_denylist_token_checks', each_item=True) + def validate_denylist_token_checks(cls, v): + if v not in ['access', 'refresh']: + raise ValueError("The 'authjwt_denylist_token_checks' must be between 'access' or 'refresh'") + return v + + @validator('authjwt_token_location', each_item=True) + def validate_token_location(cls, v): + if v not in ['headers', 'cookies']: + raise ValueError("The 'authjwt_token_location' must be between 'headers' or 'cookies'") + return v + + @validator('authjwt_cookie_samesite') + def validate_cookie_samesite(cls, v): + if v not in ['strict', 'lax', 'none']: + raise ValueError("The 'authjwt_cookie_samesite' must be between 'strict', 'lax', 'none'") + return v + + @validator('authjwt_csrf_methods', each_item=True) + def validate_csrf_methods(cls, v): + if v.upper() not in {"GET", "HEAD", "POST", "PUT", "DELETE", "PATCH"}: + raise ValueError("The 'authjwt_csrf_methods' must be between http request methods") + return v.upper() + + class Config: + min_anystr_length = 1 + anystr_strip_whitespace = True \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 6c4acab..87257b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ requires = [ "fastapi>=0.61.0", - "PyJWT>=1.7.1,<2.0.0" + "PyJWT>=2.1.0,<3.0.0" ] description-file = "README.md" diff --git a/tests/test_decode_token.py b/tests/test_decode_token.py index 5344d48..38e367a 100644 --- a/tests/test_decode_token.py +++ b/tests/test_decode_token.py @@ -51,7 +51,7 @@ def default_access_token(): @pytest.fixture(scope='function') def encoded_token(default_access_token): - return jwt.encode(default_access_token,'secret-key',algorithm='HS256').decode('utf-8') + return jwt.encode(default_access_token,'secret-key',algorithm='HS256') def test_verified_token(client,encoded_token,Authorize): class SettingsOne(BaseSettings): @@ -67,7 +67,7 @@ def get_settings_one(): assert response.status_code == 422 assert response.json() == {'detail': 'Not enough segments'} # InvalidSignatureError - token = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256').decode('utf-8') + token = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS256') response = client.get('/protected',headers={"Authorization":f"Bearer {token}"}) assert response.status_code == 422 assert response.json() == {'detail': 'Signature verification failed'} @@ -78,7 +78,7 @@ def get_settings_one(): assert response.status_code == 422 assert response.json() == {'detail': 'Signature has expired'} # InvalidAlgorithmError - token = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS384').decode('utf-8') + token = jwt.encode({'some': 'payload'}, 'secret', algorithm='HS384') response = client.get('/protected',headers={"Authorization":f"Bearer {token}"}) assert response.status_code == 422 assert response.json() == {'detail': 'The specified alg value is not allowed'}