Skip to content

fix(composer): fix version check logic for 'airflow_db_cleanup.py' #13295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 49 additions & 25 deletions composer/workflows/airflow_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Note: This sample is intended to work with Airflow 1 and 2.

# [START composer_metadb_cleanup]
"""
A maintenance workflow that you can deploy into Airflow to periodically clean
"""A maintenance workflow that you can deploy into Airflow to periodically clean
out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid
having too much data in your Airflow MetaStore.

Expand Down Expand Up @@ -72,29 +73,52 @@

# airflow-db-cleanup
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")

START_DATE = airflow.utils.dates.days_ago(1)
# How often to Run. @daily - Once a day at Midnight (UTC)

# How often to Run. @daily - Once a day at Midnight (UTC).
SCHEDULE_INTERVAL = "@daily"
# Who is listed as the owner of this DAG in the Airflow Web Server

# Who is listed as the owner of this DAG in the Airflow Web Server.
DAG_OWNER_NAME = "operations"
# List of email address to send email alerts to if this job fails

# List of email address to send email alerts to if this job fails.
ALERT_EMAIL_ADDRESSES = []
# Airflow version used by the environment in list form, value stored in
# airflow_version is in format e.g "2.3.4+composer"
AIRFLOW_VERSION = airflow_version[: -len("+composer")].split(".")
# Length to retain the log files if not already provided in the conf. If this
# is set to 30, the job will remove those files that arE 30 days old or older.


# TODO(developer): Update the following lines if you are using a version
# with non-numerical characters such as "2.9.3rc1".

# Airflow version used by the environment as a list of integers.
# For example: [2, 9, 2]
#
# Value in `airflow_version` is in format e.g "2.9.2+composer"
# It's converted to facilitate version comparison.
COMPOSER_SUFFIX = "+composer"
if airflow_version.endswith(COMPOSER_SUFFIX):
airflow_version_without_suffix = airflow_version[:-len(COMPOSER_SUFFIX)]
else:
airflow_version_without_suffix = airflow_version
AIRFLOW_VERSION_STR = airflow_version_without_suffix.split(".")
AIRFLOW_VERSION = [int(s) for s in AIRFLOW_VERSION_STR]

# Length to retain the log files if not already provided in the configuration.
# If this is set to 30, the job will remove those files
# that are 30 days old or older.
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30)
)
# Prints the database entries which will be getting deleted; set to False
# to avoid printing large lists and slowdown process

# Prints the database entries which will be getting deleted;
# set to False to avoid printing large lists and slowdown the process.
PRINT_DELETES = False
# Whether the job should delete the db entries or not. Included if you want to
# temporarily avoid deleting the db entries.

# Whether the job should delete the DB entries or not.
# Included if you want to temporarily avoid deleting the DB entries.
ENABLE_DELETE = True
# List of all the objects that will be deleted. Comment out the DB objects you
# want to skip.

# List of all the objects that will be deleted.
# Comment out the DB objects you want to skip.
DATABASE_OBJECTS = [
{
"airflow_db_model": DagRun,
Expand All @@ -106,8 +130,9 @@
{
"airflow_db_model": TaskInstance,
"age_check_column": TaskInstance.start_date
if AIRFLOW_VERSION < ["2", "2", "0"]
else TaskInstance.start_date,
# TODO: Confirm why this was added, as it doesn't make sense.
if AIRFLOW_VERSION < [2, 2, 0]
else TaskInstance.start_date,
"keep_last": False,
"keep_last_filters": None,
"keep_last_group_by": None,
Expand All @@ -122,8 +147,8 @@
{
"airflow_db_model": XCom,
"age_check_column": XCom.execution_date
if AIRFLOW_VERSION < ["2", "2", "5"]
else XCom.timestamp,
if AIRFLOW_VERSION < [2, 2, 5]
else XCom.timestamp,
"keep_last": False,
"keep_last_filters": None,
"keep_last_group_by": None,
Expand Down Expand Up @@ -152,8 +177,8 @@
{
"airflow_db_model": TaskReschedule,
"age_check_column": TaskReschedule.execution_date
if AIRFLOW_VERSION < ["2", "2", "0"]
else TaskReschedule.start_date,
if AIRFLOW_VERSION < [2, 2, 0]
else TaskReschedule.start_date,
"keep_last": False,
"keep_last_filters": None,
"keep_last_group_by": None,
Expand Down Expand Up @@ -181,7 +206,7 @@
logging.error(e)

# Check for RenderedTaskInstanceFields model
if AIRFLOW_VERSION < ["2", "4", "0"]:
if AIRFLOW_VERSION < [2, 4, 0]:
try:
from airflow.models import RenderedTaskInstanceFields

Expand Down Expand Up @@ -216,7 +241,7 @@
except Exception as e:
logging.error(e)

if AIRFLOW_VERSION < ["2", "6", "0"]:
if AIRFLOW_VERSION < [2, 6, 0]:
try:
from airflow.jobs.base_job import BaseJob

Expand Down Expand Up @@ -529,5 +554,4 @@ def analyze_db():

print_configuration.set_downstream(cleanup_op)
cleanup_op.set_downstream(analyze_op)

# [END composer_metadb_cleanup]
25 changes: 25 additions & 0 deletions composer/workflows/airflow_db_cleanup_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,31 @@
import internal_unit_testing


def convert_version_str_to_list(version: str) -> list[int]:
COMPOSER_SUFFIX = "+composer"
if version.endswith(COMPOSER_SUFFIX):
airflow_version_without_suffix = version[:-len(COMPOSER_SUFFIX)]
else:
airflow_version_without_suffix = version
AIRFLOW_VERSION_STR = airflow_version_without_suffix.split(".")
AIRFLOW_VERSION = [int(s) for s in AIRFLOW_VERSION_STR]

return AIRFLOW_VERSION


def test_version_comparison():
# b/408307862 - Validate version check logic used in the sample.
AIRFLOW_VERSION = convert_version_str_to_list("2.10.5+composer")

assert AIRFLOW_VERSION == [2, 10, 5]
assert AIRFLOW_VERSION > [2, 9, 1]

AIRFLOW_VERSION = convert_version_str_to_list("2.9.2")

assert AIRFLOW_VERSION == [2, 9, 2]
assert AIRFLOW_VERSION < [2, 9, 3]


def test_dag_import(airflow_database):
"""Test that the DAG file can be successfully imported.

Expand Down
2 changes: 1 addition & 1 deletion composer/workflows/noxfile_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"3.8",
"3.9",
"3.10",
"3.12",
# "3.12",
], # Composer w/ Airflow 2 only supports Python 3.8
# Old samples are opted out of enforcing Python type hints
# All new samples should feature them
Expand Down
1 change: 1 addition & 0 deletions composer/workflows/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
# https://github.com/apache/airflow/blob/main/pyproject.toml

apache-airflow[amazon,apache.beam,cncf.kubernetes,google,microsoft.azure,openlineage,postgres]==2.9.2
# apache-airflow[amazon,apache.beam,cncf.kubernetes,google,microsoft.azure,openlineage,postgres]==2.10.5
google-cloud-dataform==0.5.9 # used in Dataform operators
scipy==1.14.1