1
- import os
2
-
3
1
import httpx
4
2
import mcp .types as types
5
3
6
- AIRFLOW_HOST = os .getenv ("AIRFLOW_HOST" ).rstrip ("/" )
7
- AIRFLOW_USERNAME = os .getenv ("AIRFLOW_USERNAME" )
8
- AIRFLOW_PASSWORD = os .getenv ("AIRFLOW_PASSWORD" )
4
+ from src .envs import AIRFLOW_HOST , AIRFLOW_PASSWORD , AIRFLOW_USERNAME
5
+
9
6
10
7
def get_dag_url (dag_id : str ) -> str :
11
8
return f"{ AIRFLOW_HOST } /dags/{ dag_id } /grid"
12
9
10
+
13
11
def get_dag_run_url (dag_id : str , dag_run_id : str ) -> str :
14
12
return f"{ AIRFLOW_HOST } /dags/{ dag_id } /grid?dag_run_id={ dag_run_id } "
15
13
14
+
16
15
def get_task_instance_url (dag_id : str , dag_run_id : str , task_id : str ) -> str :
17
16
return f"{ AIRFLOW_HOST } /dags/{ dag_id } /grid?dag_run_id={ dag_run_id } &task_id={ task_id } "
18
17
18
+
19
19
async def fetch_dags (
20
20
limit : int | None = None ,
21
21
offset : int | None = None ,
@@ -44,69 +44,62 @@ async def fetch_dags(
44
44
45
45
url = f"{ AIRFLOW_HOST } { path } "
46
46
async with httpx .AsyncClient (follow_redirects = True ) as client :
47
- response = await client .get (
48
- url ,
49
- auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ),
50
- params = params
51
- )
47
+ response = await client .get (url , auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ), params = params )
52
48
response .raise_for_status ()
53
49
data = response .json ()
54
-
50
+
55
51
# Add UI links to each DAG
56
52
for dag in data .get ("dags" , []):
57
53
dag ["ui_url" ] = get_dag_url (dag ["dag_id" ])
58
-
54
+
59
55
return [types .TextContent (type = "text" , text = response .text )]
60
56
57
+
61
58
async def get_dag (dag_id : str ) -> list [types .TextContent | types .ImageContent | types .EmbeddedResource ]:
62
59
path = f"/api/v1/dags/{ dag_id } "
63
60
url = f"{ AIRFLOW_HOST } { path } "
64
61
async with httpx .AsyncClient (follow_redirects = True ) as client :
65
62
response = await client .get (url , auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ))
66
63
response .raise_for_status ()
67
64
data = response .json ()
68
-
65
+
69
66
# Add UI link to DAG
70
67
data ["ui_url" ] = get_dag_url (dag_id )
71
-
68
+
72
69
return [types .TextContent (type = "text" , text = response .text )]
73
70
71
+
74
72
async def pause_dag (dag_id : str ) -> list [types .TextContent | types .ImageContent | types .EmbeddedResource ]:
75
73
path = f"/api/v1/dags/{ dag_id } "
76
74
url = f"{ AIRFLOW_HOST } { path } "
77
75
async with httpx .AsyncClient (follow_redirects = True ) as client :
78
76
response = await client .patch (
79
- url ,
80
- auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ),
81
- json = {"is_paused" : True }
77
+ url , auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ), json = {"is_paused" : True }
82
78
)
83
79
response .raise_for_status ()
84
80
return [types .TextContent (type = "text" , text = response .text )]
85
81
82
+
86
83
async def unpause_dag (dag_id : str ) -> list [types .TextContent | types .ImageContent | types .EmbeddedResource ]:
87
84
path = f"/api/v1/dags/{ dag_id } "
88
85
url = f"{ AIRFLOW_HOST } { path } "
89
86
async with httpx .AsyncClient (follow_redirects = True ) as client :
90
87
response = await client .patch (
91
- url ,
92
- auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ),
93
- json = {"is_paused" : False }
88
+ url , auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ), json = {"is_paused" : False }
94
89
)
95
90
response .raise_for_status ()
96
91
return [types .TextContent (type = "text" , text = response .text )]
97
92
93
+
98
94
async def trigger_dag (dag_id : str ) -> list [types .TextContent | types .ImageContent | types .EmbeddedResource ]:
99
95
path = f"/api/v1/dags/{ dag_id } /dagRuns"
100
96
url = f"{ AIRFLOW_HOST } { path } "
101
97
async with httpx .AsyncClient (follow_redirects = True ) as client :
102
- response = await client .post (
103
- url ,
104
- auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ),
105
- json = {}
106
- )
98
+ response = await client .post (url , auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ), json = {})
107
99
response .raise_for_status ()
108
100
return [types .TextContent (type = "text" , text = response .text )]
109
101
102
+
110
103
async def get_dag_runs (
111
104
dag_id : str ,
112
105
limit : int | None = None ,
@@ -151,20 +144,17 @@ async def get_dag_runs(
151
144
152
145
url = f"{ AIRFLOW_HOST } { path } "
153
146
async with httpx .AsyncClient (follow_redirects = True ) as client :
154
- response = await client .get (
155
- url ,
156
- auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ),
157
- params = params
158
- )
147
+ response = await client .get (url , auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ), params = params )
159
148
response .raise_for_status ()
160
149
data = response .json ()
161
-
150
+
162
151
# Add UI links to each DAG run
163
152
for dag_run in data .get ("dag_runs" , []):
164
153
dag_run ["ui_url" ] = get_dag_run_url (dag_id , dag_run ["dag_run_id" ])
165
-
154
+
166
155
return [types .TextContent (type = "text" , text = response .text )]
167
156
157
+
168
158
async def get_dag_tasks (dag_id : str ) -> list [types .TextContent | types .ImageContent | types .EmbeddedResource ]:
169
159
path = f"/api/v1/dags/{ dag_id } /tasks"
170
160
url = f"{ AIRFLOW_HOST } { path } "
@@ -173,14 +163,18 @@ async def get_dag_tasks(dag_id: str) -> list[types.TextContent | types.ImageCont
173
163
response .raise_for_status ()
174
164
return [types .TextContent (type = "text" , text = response .text )]
175
165
176
- async def get_task_instance (dag_id : str , task_id : str , dag_run_id : str ) -> list [types .TextContent | types .ImageContent | types .EmbeddedResource ]:
166
+
167
+ async def get_task_instance (
168
+ dag_id : str , task_id : str , dag_run_id : str
169
+ ) -> list [types .TextContent | types .ImageContent | types .EmbeddedResource ]:
177
170
path = f"/api/v1/dags/{ dag_id } /dagRuns/{ dag_run_id } /taskInstances/{ task_id } "
178
171
url = f"{ AIRFLOW_HOST } { path } "
179
172
async with httpx .AsyncClient (follow_redirects = True ) as client :
180
173
response = await client .get (url , auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ))
181
174
response .raise_for_status ()
182
175
return [types .TextContent (type = "text" , text = response .text )]
183
176
177
+
184
178
async def list_task_instances (
185
179
dag_id : str ,
186
180
dag_run_id : str ,
@@ -235,22 +229,22 @@ async def list_task_instances(
235
229
236
230
url = f"{ AIRFLOW_HOST } { path } "
237
231
async with httpx .AsyncClient (follow_redirects = True ) as client :
238
- response = await client .get (
239
- url ,
240
- auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ),
241
- params = params
242
- )
232
+ response = await client .get (url , auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ), params = params )
243
233
response .raise_for_status ()
244
234
return [types .TextContent (type = "text" , text = response .text )]
245
235
246
- async def get_import_error (import_error_id : int ) -> list [types .TextContent | types .ImageContent | types .EmbeddedResource ]:
236
+
237
+ async def get_import_error (
238
+ import_error_id : int ,
239
+ ) -> list [types .TextContent | types .ImageContent | types .EmbeddedResource ]:
247
240
path = f"/api/v1/importErrors/{ import_error_id } "
248
241
url = f"{ AIRFLOW_HOST } { path } "
249
242
async with httpx .AsyncClient (follow_redirects = True ) as client :
250
243
response = await client .get (url , auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ))
251
244
response .raise_for_status ()
252
245
return [types .TextContent (type = "text" , text = response .text )]
253
246
247
+
254
248
async def list_import_errors (
255
249
limit : int | None = None ,
256
250
offset : int | None = None ,
@@ -267,14 +261,11 @@ async def list_import_errors(
267
261
268
262
url = f"{ AIRFLOW_HOST } { path } "
269
263
async with httpx .AsyncClient (follow_redirects = True ) as client :
270
- response = await client .get (
271
- url ,
272
- auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ),
273
- params = params
274
- )
264
+ response = await client .get (url , auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ), params = params )
275
265
response .raise_for_status ()
276
266
return [types .TextContent (type = "text" , text = response .text )]
277
267
268
+
278
269
async def get_health () -> list [types .TextContent | types .ImageContent | types .EmbeddedResource ]:
279
270
path = "/api/v1/health"
280
271
url = f"{ AIRFLOW_HOST } { path } "
@@ -283,10 +274,11 @@ async def get_health() -> list[types.TextContent | types.ImageContent | types.Em
283
274
response .raise_for_status ()
284
275
return [types .TextContent (type = "text" , text = response .text )]
285
276
277
+
286
278
async def get_version () -> list [types .TextContent | types .ImageContent | types .EmbeddedResource ]:
287
279
path = "/api/v1/version"
288
280
url = f"{ AIRFLOW_HOST } { path } "
289
281
async with httpx .AsyncClient (follow_redirects = True ) as client :
290
282
response = await client .get (url , auth = httpx .BasicAuth (AIRFLOW_USERNAME , AIRFLOW_PASSWORD ))
291
283
response .raise_for_status ()
292
- return [types .TextContent (type = "text" , text = response .text )]
284
+ return [types .TextContent (type = "text" , text = response .text )]
0 commit comments