2
2
3
3
import mcp .types as types
4
4
from airflow_client .client .api .dag_api import DAGApi
5
-
5
+ from airflow_client .client .model .clear_task_instances import ClearTaskInstances
6
+ from airflow_client .client .model .update_task_instances_state import UpdateTaskInstancesState
7
+ from airflow_client .client .model .dag import DAG
6
8
from src .airflow .airflow_client import api_client
7
9
from src .envs import AIRFLOW_HOST
8
10
11
13
12
14
def get_all_functions () -> list [tuple [Callable , str , str ]]:
13
15
return [
14
- (fetch_dags , "fetch_dags" , "Fetch all DAGs" ),
16
+ (get_dags , "fetch_dags" , "Fetch all DAGs" ),
15
17
(get_dag , "get_dag" , "Get a DAG by ID" ),
18
+ (get_dag_details , "get_dag_details" , "Get a simplified representation of DAG" ),
19
+ (get_dag_source , "get_dag_source" , "Get a source code" ),
16
20
(pause_dag , "pause_dag" , "Pause a DAG by ID" ),
17
21
(unpause_dag , "unpause_dag" , "Unpause a DAG by ID" ),
22
+ (get_dag_tasks , "get_dag_tasks" , "Get tasks for DAG" ),
23
+ (get_task , "get_task" , "Get a task by ID" ),
24
+ (get_tasks , "get_tasks" , "Get tasks for DAG" ),
25
+ (patch_dag , "patch_dag" , "Update a DAG" ),
26
+ (patch_dags , "patch_dags" , "Update multiple DAGs" ),
27
+ (delete_dag , "delete_dag" , "Delete a DAG" ),
28
+ (clear_task_instances , "clear_task_instances" , "Clear a set of task instances" ),
29
+ (set_task_instances_state , "set_task_instances_state" , "Set a state of task instances" ),
30
+ (reparse_dag_file , "reparse_dag_file" , "Request re-parsing of a DAG file" ),
18
31
]
19
32
20
33
21
34
def get_dag_url (dag_id : str ) -> str :
22
35
return f"{ AIRFLOW_HOST } /dags/{ dag_id } /grid"
23
36
24
37
25
- async def fetch_dags (
38
+ async def get_dags (
26
39
limit : Optional [int ] = None ,
27
40
offset : Optional [int ] = None ,
28
41
order_by : Optional [str ] = None ,
@@ -73,6 +86,21 @@ async def get_dag(dag_id: str) -> List[Union[types.TextContent, types.ImageConte
73
86
return [types .TextContent (type = "text" , text = str (response_dict ))]
74
87
75
88
89
+ async def get_dag_details (dag_id : str , fields : Optional [List [str ]] = None ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
90
+ # Build parameters dictionary
91
+ kwargs : Dict [str , Any ] = {}
92
+ if fields is not None :
93
+ kwargs ["fields" ] = fields
94
+
95
+ response = dag_api .get_dag_details (dag_id = dag_id , ** kwargs )
96
+ return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
97
+
98
+
99
+ async def get_dag_source (file_token : str ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
100
+ response = dag_api .get_dag_source (file_token = file_token )
101
+ return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
102
+
103
+
76
104
async def pause_dag (dag_id : str ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
77
105
response = dag_api .patch_dag (dag_id = dag_id , dag_update_request = {"is_paused" : True })
78
106
return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
@@ -88,16 +116,45 @@ async def get_dag_tasks(dag_id: str) -> List[Union[types.TextContent, types.Imag
88
116
return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
89
117
90
118
91
- async def update_dag (
119
+ async def patch_dag (
92
120
dag_id : str , is_paused : Optional [bool ] = None , tags : Optional [List [str ]] = None
93
121
) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
94
122
update_request = {}
123
+ update_mask = []
124
+
95
125
if is_paused is not None :
96
126
update_request ["is_paused" ] = is_paused
127
+ update_mask .append ("is_paused" )
97
128
if tags is not None :
98
129
update_request ["tags" ] = tags
130
+ update_mask .append ("tags" )
131
+
132
+ dag = DAG (** update_request )
133
+
134
+ response = dag_api .patch_dag (dag_id = dag_id , dag = dag , update_mask = update_mask )
135
+ return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
136
+
137
+
138
+ async def patch_dags (
139
+ dag_id_pattern : Optional [str ] = None , is_paused : Optional [bool ] = None , tags : Optional [List [str ]] = None ,
140
+ ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
141
+ update_request = {}
142
+ update_mask = []
99
143
100
- response = dag_api .patch_dag (dag_id = dag_id , dag_update_request = update_request )
144
+ if is_paused is not None :
145
+ update_request ["is_paused" ] = is_paused
146
+ update_mask .append ("is_paused" )
147
+ if tags is not None :
148
+ update_request ["tags" ] = tags
149
+ update_mask .append ("tags" )
150
+
151
+ dag = DAG (** update_request )
152
+
153
+ kwargs = {}
154
+ if dag_id_pattern is not None :
155
+ kwargs ["dag_id_pattern" ] = dag_id_pattern
156
+
157
+ response = dag_api .patch_dags (dag_id_pattern = dag_id_pattern , dag = dag , update_mask = update_mask , ** kwargs )
101
158
return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
102
159
103
160
@@ -111,3 +168,98 @@ async def get_task(
111
168
) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
112
169
response = dag_api .get_task (dag_id = dag_id , task_id = task_id )
113
170
return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
171
+
172
+
173
+ async def get_tasks (dag_id : str , order_by : Optional [str ] = None ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
174
+ kwargs = {}
175
+ if order_by is not None :
176
+ kwargs ["order_by" ] = order_by
177
+
178
+ response = dag_api .get_tasks (dag_id = dag_id , ** kwargs )
179
+ return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
180
+
181
+
182
+ async def clear_task_instances (
183
+ dag_id : str ,
184
+ task_ids : Optional [List [str ]] = None ,
185
+ start_date : Optional [str ] = None ,
186
+ end_date : Optional [str ] = None ,
187
+ include_subdags : Optional [bool ] = None ,
188
+ include_parentdag : Optional [bool ] = None ,
189
+ include_upstream : Optional [bool ] = None ,
190
+ include_downstream : Optional [bool ] = None ,
191
+ include_future : Optional [bool ] = None ,
192
+ include_past : Optional [bool ] = None ,
193
+ dry_run : Optional [bool ] = None ,
194
+ reset_dag_runs : Optional [bool ] = None ,
195
+ ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
196
+ clear_request = {}
197
+ if task_ids is not None :
198
+ clear_request ["task_ids" ] = task_ids
199
+ if start_date is not None :
200
+ clear_request ["start_date" ] = start_date
201
+ if end_date is not None :
202
+ clear_request ["end_date" ] = end_date
203
+ if include_subdags is not None :
204
+ clear_request ["include_subdags" ] = include_subdags
205
+ if include_parentdag is not None :
206
+ clear_request ["include_parentdag" ] = include_parentdag
207
+ if include_upstream is not None :
208
+ clear_request ["include_upstream" ] = include_upstream
209
+ if include_downstream is not None :
210
+ clear_request ["include_downstream" ] = include_downstream
211
+ if include_future is not None :
212
+ clear_request ["include_future" ] = include_future
213
+ if include_past is not None :
214
+ clear_request ["include_past" ] = include_past
215
+ if dry_run is not None :
216
+ clear_request ["dry_run" ] = dry_run
217
+ if reset_dag_runs is not None :
218
+ clear_request ["reset_dag_runs" ] = reset_dag_runs
219
+
220
+ clear_task_instances = ClearTaskInstances (** clear_request )
221
+
222
+ response = dag_api .post_clear_task_instances (dag_id = dag_id , clear_task_instances = clear_task_instances )
223
+ return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
224
+
225
+
226
+ async def set_task_instances_state (
227
+ dag_id : str ,
228
+ state : str ,
229
+ task_ids : Optional [List [str ]] = None ,
230
+ execution_date : Optional [str ] = None ,
231
+ include_upstream : Optional [bool ] = None ,
232
+ include_downstream : Optional [bool ] = None ,
233
+ include_future : Optional [bool ] = None ,
234
+ include_past : Optional [bool ] = None ,
235
+ dry_run : Optional [bool ] = None ,
236
+ ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
237
+ state_request = {
238
+ "state" : state
239
+ }
240
+ if task_ids is not None :
241
+ state_request ["task_ids" ] = task_ids
242
+ if execution_date is not None :
243
+ state_request ["execution_date" ] = execution_date
244
+ if include_upstream is not None :
245
+ state_request ["include_upstream" ] = include_upstream
246
+ if include_downstream is not None :
247
+ state_request ["include_downstream" ] = include_downstream
248
+ if include_future is not None :
249
+ state_request ["include_future" ] = include_future
250
+ if include_past is not None :
251
+ state_request ["include_past" ] = include_past
252
+ if dry_run is not None :
253
+ state_request ["dry_run" ] = dry_run
254
+
255
+ update_task_instances_state = UpdateTaskInstancesState (** state_request )
256
+
257
+ response = dag_api .post_set_task_instances_state (dag_id = dag_id , update_task_instances_state = update_task_instances_state )
258
+ return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
259
+
260
+
261
+ async def reparse_dag_file (
262
+ file_token : str
263
+ ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
264
+ response = dag_api .reparse_dag_file (file_token = file_token )
265
+ return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
0 commit comments