1
- from typing import Any , Dict , List , Optional , Union , Callable
1
+ from typing import Any , Callable , Dict , List , Optional , Union
2
2
3
3
import mcp .types as types
4
4
from airflow_client .client .api .dag_api import DAGApi
5
5
from airflow_client .client .model .clear_task_instances import ClearTaskInstances
6
- from airflow_client .client .model .update_task_instances_state import UpdateTaskInstancesState
7
6
from airflow_client .client .model .dag import DAG
7
+ from airflow_client .client .model .update_task_instances_state import UpdateTaskInstancesState
8
+
8
9
from src .airflow .airflow_client import api_client
9
10
from src .envs import AIRFLOW_HOST
10
11
@@ -86,12 +87,14 @@ async def get_dag(dag_id: str) -> List[Union[types.TextContent, types.ImageConte
86
87
return [types .TextContent (type = "text" , text = str (response_dict ))]
87
88
88
89
89
- async def get_dag_details (dag_id : str , fields : Optional [List [str ]] = None ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
90
+ async def get_dag_details (
91
+ dag_id : str , fields : Optional [List [str ]] = None
92
+ ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
90
93
# Build parameters dictionary
91
94
kwargs : Dict [str , Any ] = {}
92
95
if fields is not None :
93
96
kwargs ["fields" ] = fields
94
-
97
+
95
98
response = dag_api .get_dag_details (dag_id = dag_id , ** kwargs )
96
99
return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
97
100
@@ -136,11 +139,13 @@ async def patch_dag(
136
139
137
140
138
141
async def patch_dags (
139
- dag_id_pattern : Optional [str ] = None , is_paused : Optional [bool ] = None , tags : Optional [List [str ]] = None ,
142
+ dag_id_pattern : Optional [str ] = None ,
143
+ is_paused : Optional [bool ] = None ,
144
+ tags : Optional [List [str ]] = None ,
140
145
) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
141
146
update_request = {}
142
147
update_mask = []
143
-
148
+
144
149
if is_paused is not None :
145
150
update_request ["is_paused" ] = is_paused
146
151
update_mask .append ("is_paused" )
@@ -153,7 +158,7 @@ async def patch_dags(
153
158
kwargs = {}
154
159
if dag_id_pattern is not None :
155
160
kwargs ["dag_id_pattern" ] = dag_id_pattern
156
-
161
+
157
162
response = dag_api .patch_dags (dag_id_pattern = dag_id_pattern , dag = dag , update_mask = update_mask , ** kwargs )
158
163
return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
159
164
@@ -170,7 +175,9 @@ async def get_task(
170
175
return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
171
176
172
177
173
- async def get_tasks (dag_id : str , order_by : Optional [str ] = None ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
178
+ async def get_tasks (
179
+ dag_id : str , order_by : Optional [str ] = None
180
+ ) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
174
181
kwargs = {}
175
182
if order_by is not None :
176
183
kwargs ["order_by" ] = order_by
@@ -218,7 +225,7 @@ async def clear_task_instances(
218
225
clear_request ["reset_dag_runs" ] = reset_dag_runs
219
226
220
227
clear_task_instances = ClearTaskInstances (** clear_request )
221
-
228
+
222
229
response = dag_api .post_clear_task_instances (dag_id = dag_id , clear_task_instances = clear_task_instances )
223
230
return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
224
231
@@ -234,9 +241,7 @@ async def set_task_instances_state(
234
241
include_past : Optional [bool ] = None ,
235
242
dry_run : Optional [bool ] = None ,
236
243
) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
237
- state_request = {
238
- "state" : state
239
- }
244
+ state_request = {"state" : state }
240
245
if task_ids is not None :
241
246
state_request ["task_ids" ] = task_ids
242
247
if execution_date is not None :
@@ -254,12 +259,15 @@ async def set_task_instances_state(
254
259
255
260
update_task_instances_state = UpdateTaskInstancesState (** state_request )
256
261
257
- response = dag_api .post_set_task_instances_state (dag_id = dag_id , update_task_instances_state = update_task_instances_state )
262
+ response = dag_api .post_set_task_instances_state (
263
+ dag_id = dag_id ,
264
+ update_task_instances_state = update_task_instances_state ,
265
+ )
258
266
return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
259
267
260
268
261
269
async def reparse_dag_file (
262
- file_token : str
270
+ file_token : str ,
263
271
) -> List [Union [types .TextContent , types .ImageContent , types .EmbeddedResource ]]:
264
272
response = dag_api .reparse_dag_file (file_token = file_token )
265
273
return [types .TextContent (type = "text" , text = str (response .to_dict ()))]
0 commit comments