Skip to content

IBMid Authentication support #1447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 224 additions & 2 deletions SoftLayer/API.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,25 @@
:license: MIT, see LICENSE for more details.
"""
# pylint: disable=invalid-name
import time
import warnings

import json
import logging
import requests


from SoftLayer import auth as slauth
from SoftLayer import config
from SoftLayer import consts
from SoftLayer import exceptions
from SoftLayer import transports

LOGGER = logging.getLogger(__name__)
API_PUBLIC_ENDPOINT = consts.API_PUBLIC_ENDPOINT
API_PRIVATE_ENDPOINT = consts.API_PRIVATE_ENDPOINT
CONFIG_FILE = consts.CONFIG_FILE

__all__ = [
'create_client_from_env',
'Client',
Expand Down Expand Up @@ -80,6 +89,8 @@ def create_client_from_env(username=None,
'Your Company'

"""
if config_file is None:
config_file = CONFIG_FILE
settings = config.get_client_settings(username=username,
api_key=api_key,
endpoint_url=endpoint_url,
Expand Down Expand Up @@ -127,7 +138,7 @@ def create_client_from_env(username=None,
settings.get('api_key'),
)

return BaseClient(auth=auth, transport=transport)
return BaseClient(auth=auth, transport=transport, config_file=config_file)


def Client(**kwargs):
Expand All @@ -150,8 +161,35 @@ class BaseClient(object):

_prefix = "SoftLayer_"

def __init__(self, auth=None, transport=None):
def __init__(self, auth=None, transport=None, config_file=None):
if config_file is None:
config_file = CONFIG_FILE
self.auth = auth
self.config_file = config_file
self.settings = config.get_config(self.config_file)

if transport is None:
url = self.settings['softlayer'].get('endpoint_url')
if url is not None and '/rest' in url:
# If this looks like a rest endpoint, use the rest transport
transport = transports.RestTransport(
endpoint_url=url,
proxy=self.settings['softlayer'].get('proxy'),
# prevents an exception incase timeout is a float number.
timeout=int(self.settings['softlayer'].getfloat('timeout')),
user_agent=consts.USER_AGENT,
verify=self.settings['softlayer'].getboolean('verify'),
)
else:
# Default the transport to use XMLRPC
transport = transports.XmlRpcTransport(
endpoint_url=url,
proxy=self.settings['softlayer'].get('proxy'),
timeout=int(self.settings['softlayer'].getfloat('timeout')),
user_agent=consts.USER_AGENT,
verify=self.settings['softlayer'].getboolean('verify'),
)

self.transport = transport

def authenticate_with_password(self, username, password,
Expand Down Expand Up @@ -322,6 +360,190 @@ def __len__(self):
return 0


class IAMClient(BaseClient):
"""IBM ID Client for using IAM authentication

:param auth: auth driver that looks like SoftLayer.auth.AuthenticationBase
:param transport: An object that's callable with this signature: transport(SoftLayer.transports.Request)
"""

def authenticate_with_password(self, username, password, security_question_id=None, security_question_answer=None):
"""Performs IBM IAM Username/Password Authentication

:param string username: your IBMid username
:param string password: your IBMid password
"""

iam_client = requests.Session()

headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': consts.USER_AGENT,
'Accept': 'application/json'
}
data = {
'grant_type': 'password',
'password': password,
'response_type': 'cloud_iam',
'username': username
}

try:
response = iam_client.request(
'POST',
'https://iam.cloud.ibm.com/identity/token',
data=data,
headers=headers,
auth=requests.auth.HTTPBasicAuth('bx', 'bx')
)
if response.status_code != 200:
LOGGER.error("Unable to login: %s", response.text)

response.raise_for_status()
tokens = json.loads(response.text)
except requests.HTTPError as ex:
error = json.loads(response.text)
raise exceptions.IAMError(response.status_code,
error.get('errorMessage'),
'https://iam.cloud.ibm.com/identity/token') from ex

self.settings['softlayer']['access_token'] = tokens['access_token']
self.settings['softlayer']['refresh_token'] = tokens['refresh_token']

config.write_config(self.settings, self.config_file)
self.auth = slauth.BearerAuthentication('', tokens['access_token'], tokens['refresh_token'])

return tokens

def authenticate_with_passcode(self, passcode):
"""Performs IBM IAM SSO Authentication

:param string passcode: your IBMid password
"""

iam_client = requests.Session()

headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': consts.USER_AGENT,
'Accept': 'application/json'
}
data = {
'grant_type': 'urn:ibm:params:oauth:grant-type:passcode',
'passcode': passcode,
'response_type': 'cloud_iam'
}

try:
response = iam_client.request(
'POST',
'https://iam.cloud.ibm.com/identity/token',
data=data,
headers=headers,
auth=requests.auth.HTTPBasicAuth('bx', 'bx')
)
if response.status_code != 200:
LOGGER.error("Unable to login: %s", response.text)

response.raise_for_status()
tokens = json.loads(response.text)

except requests.HTTPError as ex:
error = json.loads(response.text)
raise exceptions.IAMError(response.status_code,
error.get('errorMessage'),
'https://iam.cloud.ibm.com/identity/token') from ex

self.settings['softlayer']['access_token'] = tokens['access_token']
self.settings['softlayer']['refresh_token'] = tokens['refresh_token']
a_expire = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(tokens['expiration']))
r_expire = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(tokens['refresh_token_expiration']))
LOGGER.warning("Tokens retrieved, expires at %s, Refresh expires at %s", a_expire, r_expire)
config.write_config(self.settings, self.config_file)
self.auth = slauth.BearerAuthentication('', tokens['access_token'], tokens['refresh_token'])

return tokens

def authenticate_with_iam_token(self, a_token, r_token=None):
"""Authenticates to the SL API with an IAM Token

:param string a_token: Access token
:param string r_token: Refresh Token, to be used if Access token is expired.
"""
self.auth = slauth.BearerAuthentication('', a_token, r_token)

def refresh_iam_token(self, r_token, account_id=None, ims_account=None):
"""Refreshes the IAM Token, will default to values in the config file"""
iam_client = requests.Session()

headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': consts.USER_AGENT,
'Accept': 'application/json'
}
data = {
'grant_type': 'refresh_token',
'refresh_token': r_token,
'response_type': 'cloud_iam'
}

sl_config = self.settings['softlayer']

if account_id is None and sl_config.get('account_id', False):
account_id = sl_config.get('account_id')
if ims_account is None and sl_config.get('ims_account', False):
ims_account = sl_config.get('ims_account')

data['account'] = account_id
data['ims_account'] = ims_account

try:
response = iam_client.request(
'POST',
'https://iam.cloud.ibm.com/identity/token',
data=data,
headers=headers,
auth=requests.auth.HTTPBasicAuth('bx', 'bx')
)

if response.status_code != 200:
LOGGER.warning("Unable to refresh IAM Token. %s", response.text)

response.raise_for_status()
tokens = json.loads(response.text)

except requests.HTTPError as ex:
error = json.loads(response.text)
raise exceptions.IAMError(response.status_code,
error.get('errorMessage'),
'https://iam.cloud.ibm.com/identity/token') from ex

a_expire = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(tokens['expiration']))
r_expire = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(tokens['refresh_token_expiration']))
LOGGER.warning("Tokens retrieved, expires at %s, Refresh expires at %s", a_expire, r_expire)

self.settings['softlayer']['access_token'] = tokens['access_token']
self.settings['softlayer']['refresh_token'] = tokens['refresh_token']
config.write_config(self.settings, self.config_file)
self.auth = slauth.BearerAuthentication('', tokens['access_token'])
return tokens

def call(self, service, method, *args, **kwargs):
"""Handles refreshing IAM tokens in case of a HTTP 401 error"""
try:
return super().call(service, method, *args, **kwargs)
except exceptions.SoftLayerAPIError as ex:

if ex.faultCode == 401:
LOGGER.warning("Token has expired, trying to refresh. %s", ex.faultString)
return ex
else:
raise ex

def __repr__(self):
return "IAMClient(transport=%r, auth=%r)" % (self.transport, self.auth)


class Service(object):
"""A SoftLayer Service.

Expand Down
Loading