Skip to content

introduce git info by default #2372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions metaflow/metadata_provider/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,20 @@ def _get_system_info_as_dict(self):
sys_info["r_version"] = env["r_version_code"]
return sys_info

def _get_git_info_as_dict(self):
git_info = {}
env = self._environment.get_environment_info()
for key in [
"script",
"repo_url",
"branch_name",
"commit_sha",
"has_uncommitted_changes",
]:
if key in env and env[key]:
git_info[key] = env[key]
return git_info

def _get_system_tags(self):
"""Convert system info dictionary into a list of system tags"""
return [
Expand Down Expand Up @@ -670,6 +684,16 @@ def _register_system_metadata(self, run_id, step_name, task_id, attempt):
tags=["attempt_id:{0}".format(attempt)],
)
)
# And add git metadata
metadata.extend(
MetaDatum(
field=str(k),
value=str(v),
type="git-info",
tags=["attempt_id:{0}".format(attempt)],
)
for k, v in self._get_git_info_as_dict().items()
)
if metadata:
self.register_metadata(run_id, step_name, task_id, metadata)

Expand Down
7 changes: 6 additions & 1 deletion metaflow/metaflow_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .util import get_username
from . import metaflow_version
from . import metaflow_git
from metaflow.exception import MetaflowException
from metaflow.extension_support import dump_module_info
from metaflow.mflog import BASH_MFLOG, BASH_FLUSH_LOGS
Expand Down Expand Up @@ -197,6 +198,10 @@ def get_environment_info(self, include_ext_info=False):
"python_version_code": "%d.%d.%d" % sys.version_info[:3],
"metaflow_version": metaflow_version.get_version(),
"script": os.path.basename(os.path.abspath(sys.argv[0])),
# Add git info
**metaflow_git.get_repository_info(
path=os.path.dirname(os.path.abspath(sys.argv[0]))
),
}
if R.use_r():
env["metaflow_r_version"] = R.metaflow_r_version()
Expand All @@ -206,7 +211,7 @@ def get_environment_info(self, include_ext_info=False):
# Information about extension modules (to load them in the proper order)
ext_key, ext_val = dump_module_info()
env[ext_key] = ext_val
return env
return {k: v for k, v in env.items() if v is not None and v != ""}

def executable(self, step_name, default=None):
if default is not None:
Expand Down
170 changes: 170 additions & 0 deletions metaflow/metaflow_git.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env python
"""Get git repository information for the package

Functions to retrieve git repository details like URL, branch name,
and commit SHA for Metaflow code provenance tracking.
"""

import os
import subprocess
from os import path, name, environ
from typing import Dict, Union

# Cache for git information to avoid repeated subprocess calls
_git_info_cache = None

__all__ = ("get_repository_info",)

GIT_COMMAND = "git"

if name == "nt":
# Use the same git command finding logic as in metaflow_version.py
def find_git_on_windows():
"""find the path to the git executable on Windows"""
# first see if git is in the path
try:
subprocess.check_output(["where", "/Q", "git"])
# if this command succeeded, git is in the path
return "git"
# catch the exception thrown if git was not found
except subprocess.CalledProcessError:
pass
# There are several locations where git.exe may be hiding
possible_locations = []
# look in program files for msysgit
if "PROGRAMFILES(X86)" in environ:
possible_locations.append(
"%s/Git/cmd/git.exe" % environ["PROGRAMFILES(X86)"]
)
if "PROGRAMFILES" in environ:
possible_locations.append("%s/Git/cmd/git.exe" % environ["PROGRAMFILES"])
# look for the GitHub version of git
if "LOCALAPPDATA" in environ:
github_dir = "%s/GitHub" % environ["LOCALAPPDATA"]
if path.isdir(github_dir):
for subdir in os.listdir(github_dir):
if not subdir.startswith("PortableGit"):
continue
possible_locations.append(
"%s/%s/bin/git.exe" % (github_dir, subdir)
)
for possible_location in possible_locations:
if path.isfile(possible_location):
return possible_location
# git was not found
return "git"

GIT_COMMAND = find_git_on_windows()


def _get_repo_url(path: Union[str, os.PathLike]):
"""Get the repository URL from git config"""
try:
result = subprocess.run(
[GIT_COMMAND, "config", "--get", "remote.origin.url"],
cwd=path,
capture_output=True,
text=True,
check=False,
)
if result.returncode == 0:
url = result.stdout.strip()
# Convert SSH URLs to HTTPS for clickable links
if url.startswith("git@"):
parts = url.split(":")
if len(parts) == 2:
domain = parts[0].replace("git@", "")
repo_path = parts[1]
url = f"https://{domain}/{repo_path}"
return url
return None
except (OSError, subprocess.SubprocessError):
return None


def _get_branch_name(path: Union[str, os.PathLike]):
"""Get the current git branch name"""
try:
result = subprocess.run(
[GIT_COMMAND, "rev-parse", "--abbrev-ref", "HEAD"],
cwd=path,
capture_output=True,
text=True,
check=False,
)
return result.stdout.strip() if result.returncode == 0 else None
except (OSError, subprocess.SubprocessError):
return None


def _get_commit_sha(path: Union[str, os.PathLike]):
"""Get the current git commit SHA"""
try:
result = subprocess.run(
[GIT_COMMAND, "rev-parse", "HEAD"],
cwd=path,
capture_output=True,
text=True,
check=False,
)
return result.stdout.strip() if result.returncode == 0 else None
except (OSError, subprocess.SubprocessError):
return None


def _is_in_git_repo(path: Union[str, os.PathLike]):
"""Check if we're currently in a git repository"""
try:
result = subprocess.run(
[GIT_COMMAND, "rev-parse", "--is-inside-work-tree"],
cwd=path,
capture_output=True,
text=True,
check=False,
)
return result.returncode == 0 and result.stdout.strip() == "true"
except (OSError, subprocess.SubprocessError):
return False


def _has_uncommitted_changes(path: Union[str, os.PathLike]):
"""Check if the git repository has uncommitted changes"""
try:
result = subprocess.run(
[GIT_COMMAND, "status", "--porcelain"],
cwd=path,
capture_output=True,
text=True,
check=False,
)
# If output is not empty, there are uncommitted changes
return result.returncode == 0 and bool(result.stdout.strip())
except (OSError, subprocess.SubprocessError):
return None


def get_repository_info(path: Union[str, os.PathLike]) -> Dict[str, Union[str, bool]]:
"""Get git repository information for a path

Returns:
dict: Dictionary containing:
repo_url: Repository URL (converted to HTTPS if from SSH)
branch_name: Current branch name
commit_sha: Current commit SHA
has_uncommitted_changes: Boolean indicating if there are uncommitted changes
"""
global _git_info_cache

if _git_info_cache is not None:
return _git_info_cache

_git_info_cache = {}
if _is_in_git_repo(path):
_git_info_cache = {
"repo_url": _get_repo_url(path),
"branch_name": _get_branch_name(path),
"commit_sha": _get_commit_sha(path),
"has_uncommitted_changes": _has_uncommitted_changes(path),
}

return _git_info_cache
Loading