1
1
2
2
from typing import Optional
3
3
from botocore .auth import SigV4Auth
4
+ from botocore .awsrequest import AWSRequest
4
5
from botocore .credentials import Credentials
5
6
import botocore .session
6
7
7
8
8
- class SigV4AuthFactory :
9
+ class AwsSignedRequest :
9
10
"""
10
- SigV4 authentication utility
11
+ Authenticating Requests (AWS Signature Version 4)
11
12
12
13
Args:
13
14
region (str): AWS region
@@ -24,28 +25,47 @@ class SigV4AuthFactory:
24
25
**Using default credentials**
25
26
>>> from aws_lambda_powertools.utilities.iam import SigV4AuthFactory
26
27
>>> auth = SigV4AuthFactory(region="us-east-2", service="vpc-lattice-svcs")
28
+ """
27
29
28
30
29
-
30
- """
31
31
def __init__ (
32
32
self ,
33
- region : str ,
34
33
service : str ,
34
+ method : str ,
35
+ url : str ,
36
+ data : Optional [str ],
37
+ params : Optional [str ],
38
+ headers : Optional [str ],
35
39
access_key : Optional [str ],
36
40
secret_key : Optional [str ],
37
41
token : Optional [str ],
42
+ region : Optional [str ],
43
+ sign_payload : Optional [bool ] = False ,
38
44
):
39
- self . _region = region
45
+
40
46
self ._service = service
47
+ self ._method = method
48
+ self ._url = url
49
+ self ._data = data
50
+ self ._params = params
51
+ self ._headers = headers
52
+
53
+ if not region :
54
+ self ._region = botocore .session .Session ().get_config_variable ("region" )
55
+ else :
56
+ self ._region = region
41
57
42
58
if access_key and secret_key or token :
43
59
self ._access_key = access_key
44
60
self ._secret_key = secret_key
45
61
self ._credentials = Credentials (access_key = self ._access_key , secret_key = self ._secret_key , token = token )
46
-
47
62
else :
48
63
self ._credentials = botocore .session .Session ().get_credentials ()
49
64
50
65
def __call__ (self ):
51
- return SigV4Auth (credentials = self ._credentials , service = self ._service , region = self ._region )
66
+ request = AWSRequest (method = self ._method , url = self ._url , data = self ._data , params = self ._params , headers = self ._headers )
67
+ if sign_payload is False :
68
+ request .context ["payload_signing_enabled" ] = False
69
+
70
+ signed_request = SigV4Auth (credentials = self ._credentials , service_name = self ._service , region_name = self ._region ).add_auth (request )
71
+ return signed_request .prepare ()
0 commit comments