13
13
# See the License for the specific language governing permissions and
14
14
# limitations under the License.rom googleapiclient import discovery
15
15
16
+ # [START kms_asymmetric_imports]
16
17
import base64
17
18
import hashlib
18
19
19
20
from cryptography .exceptions import InvalidSignature
20
21
from cryptography .hazmat .backends import default_backend
21
22
from cryptography .hazmat .primitives import hashes , serialization
22
23
from cryptography .hazmat .primitives .asymmetric import ec , padding , utils
24
+ # [END kms_asymmetric_imports]
23
25
24
26
25
27
# [START kms_get_asymmetric_public]
@@ -43,35 +45,34 @@ def getAsymmetricPublicKey(client, key_path):
43
45
# [START kms_decrypt_rsa]
44
46
def decryptRSA (ciphertext , client , key_path ):
45
47
"""
46
- Decrypt a given ciphertext using an 'RSA_DECRYPT_OAEP_2048_SHA256' private
47
- key stored on Cloud KMS
48
+ Decrypt the input ciphertext (bytes) using an
49
+ 'RSA_DECRYPT_OAEP_2048_SHA256' private key stored on Cloud KMS
48
50
"""
51
+ request_body = {'ciphertext' : base64 .b64encode (ciphertext ).decode ('utf-8' )}
49
52
request = client .projects () \
50
53
.locations () \
51
54
.keyRings () \
52
55
.cryptoKeys () \
53
56
.cryptoKeyVersions () \
54
57
.asymmetricDecrypt (name = key_path ,
55
- body = { 'ciphertext' : ciphertext } )
58
+ body = request_body )
56
59
response = request .execute ()
57
- plaintext = base64 .b64decode (response ['plaintext' ]). decode ( 'utf-8' )
60
+ plaintext = base64 .b64decode (response ['plaintext' ])
58
61
return plaintext
59
62
# [END kms_decrypt_rsa]
60
63
61
64
62
65
# [START kms_encrypt_rsa]
63
- def encryptRSA (message , client , key_path ):
66
+ def encryptRSA (plaintext , client , key_path ):
64
67
"""
65
- Encrypt message locally using an 'RSA_DECRYPT_OAEP_2048_SHA256' public
66
- key retrieved from Cloud KMS
68
+ Encrypt the input plaintext (bytes) locally using an
69
+ 'RSA_DECRYPT_OAEP_2048_SHA256' public key retrieved from Cloud KMS
67
70
"""
68
71
public_key = getAsymmetricPublicKey (client , key_path )
69
72
pad = padding .OAEP (mgf = padding .MGF1 (algorithm = hashes .SHA256 ()),
70
73
algorithm = hashes .SHA256 (),
71
74
label = None )
72
- ciphertext = public_key .encrypt (message .encode ('ascii' ), pad )
73
- ciphertext = base64 .b64encode (ciphertext ).decode ('utf-8' )
74
- return ciphertext
75
+ return public_key .encrypt (plaintext , pad )
75
76
# [END kms_encrypt_rsa]
76
77
77
78
@@ -82,7 +83,7 @@ def signAsymmetric(message, client, key_path):
82
83
"""
83
84
# Note: some key algorithms will require a different hash function
84
85
# For example, EC_SIGN_P384_SHA384 requires SHA384
85
- digest_bytes = hashlib .sha256 (message . encode ( 'ascii' ) ).digest ()
86
+ digest_bytes = hashlib .sha256 (message ).digest ()
86
87
digest64 = base64 .b64encode (digest_bytes )
87
88
88
89
digest_JSON = {'sha256' : digest64 .decode ('utf-8' )}
@@ -94,24 +95,22 @@ def signAsymmetric(message, client, key_path):
94
95
.asymmetricSign (name = key_path ,
95
96
body = {'digest' : digest_JSON })
96
97
response = request .execute ()
97
- return response .get ('signature' , None )
98
+ return base64 . b64decode ( response .get ('signature' , None ) )
98
99
# [END kms_sign_asymmetric]
99
100
100
101
101
102
# [START kms_verify_signature_rsa]
102
103
def verifySignatureRSA (signature , message , client , key_path ):
103
104
"""
104
105
Verify the validity of an 'RSA_SIGN_PSS_2048_SHA256' signature for the
105
- specified plaintext message
106
+ specified message
106
107
"""
107
108
public_key = getAsymmetricPublicKey (client , key_path )
108
-
109
- digest_bytes = hashlib .sha256 (message .encode ('ascii' )).digest ()
110
- sig_bytes = base64 .b64decode (signature )
109
+ digest_bytes = hashlib .sha256 (message ).digest ()
111
110
112
111
try :
113
112
# Attempt verification
114
- public_key .verify (sig_bytes ,
113
+ public_key .verify (signature ,
115
114
digest_bytes ,
116
115
padding .PSS (mgf = padding .MGF1 (hashes .SHA256 ()),
117
116
salt_length = 32 ),
@@ -127,16 +126,14 @@ def verifySignatureRSA(signature, message, client, key_path):
127
126
def verifySignatureEC (signature , message , client , key_path ):
128
127
"""
129
128
Verify the validity of an 'EC_SIGN_P256_SHA256' signature
130
- for the specified plaintext message
129
+ for the specified message
131
130
"""
132
131
public_key = getAsymmetricPublicKey (client , key_path )
133
-
134
- digest_bytes = hashlib .sha256 (message .encode ('ascii' )).digest ()
135
- sig_bytes = base64 .b64decode (signature )
132
+ digest_bytes = hashlib .sha256 (message ).digest ()
136
133
137
134
try :
138
135
# Attempt verification
139
- public_key .verify (sig_bytes ,
136
+ public_key .verify (signature ,
140
137
digest_bytes ,
141
138
ec .ECDSA (utils .Prehashed (hashes .SHA256 ())))
142
139
# No errors were thrown. Verification was successful
0 commit comments