13
13
TaintedNode
14
14
)
15
15
from ..helper_visitors import (
16
+ CallVisitor ,
16
17
RHSVisitor ,
17
18
VarsVisitor
18
19
)
19
- from .trigger_definitions_parser import parse
20
+ from .trigger_definitions_parser import parse , Source
20
21
from .vulnerability_helper import (
21
22
Sanitiser ,
22
23
TriggerNode ,
@@ -49,8 +50,7 @@ def identify_triggers(
49
50
tainted_nodes = filter_cfg_nodes (cfg , TaintedNode )
50
51
tainted_trigger_nodes = [
51
52
TriggerNode (
52
- 'Framework function URL parameter' ,
53
- sanitisers = None ,
53
+ Source ('Framework function URL parameter' ),
54
54
cfg_node = node
55
55
) for node in tainted_nodes
56
56
]
@@ -142,7 +142,7 @@ def find_triggers(
142
142
143
143
Args:
144
144
nodes(list[Node]): the nodes to find triggers in.
145
- trigger_word_list(list[string ]): list of trigger words to look for.
145
+ trigger_word_list(list[Union[Sink, Source] ]): list of trigger words to look for.
146
146
nosec_lines(set): lines with # nosec whitelisting
147
147
148
148
Returns:
@@ -157,23 +157,21 @@ def find_triggers(
157
157
158
158
def label_contains (
159
159
node ,
160
- trigger_words
160
+ triggers
161
161
):
162
162
"""Determine if node contains any of the trigger_words provided.
163
163
164
164
Args:
165
165
node(Node): CFG node to check.
166
- trigger_words(list[string ]): list of trigger words to look for.
166
+ trigger_words(list[Union[Sink, Source] ]): list of trigger words to look for.
167
167
168
168
Returns:
169
169
Iterable of TriggerNodes found. Can be multiple because multiple
170
170
trigger_words can be in one node.
171
171
"""
172
- for trigger_word_tuple in trigger_words :
173
- if trigger_word_tuple [0 ] in node .label :
174
- trigger_word = trigger_word_tuple [0 ]
175
- sanitisers = trigger_word_tuple [1 ]
176
- yield TriggerNode (trigger_word , sanitisers , node )
172
+ for trigger in triggers :
173
+ if trigger .trigger_word in node .label :
174
+ yield TriggerNode (trigger , node )
177
175
178
176
179
177
def build_sanitiser_node_dict (
@@ -243,6 +241,37 @@ def get_sink_args(cfg_node):
243
241
return vv .result
244
242
245
243
244
+ def get_sink_args_which_propagate (sink , ast_node ):
245
+ sink_args_with_positions = CallVisitor .get_call_visit_results (sink .trigger .call , ast_node )
246
+ sink_args = []
247
+
248
+ for i , vars in enumerate (sink_args_with_positions .args ):
249
+ if sink .trigger .arg_propagates (i ):
250
+ sink_args .extend (vars )
251
+
252
+ if (
253
+ # Either any unspecified arg propagates
254
+ not sink .trigger .arg_list_propagates or
255
+ # or there are some propagating args which weren't passed positionally
256
+ any (1 for position in sink .trigger .arg_list if position >= len (sink_args_with_positions .args ))
257
+ ):
258
+ sink_args .extend (sink_args_with_positions .unknown_args )
259
+
260
+ for keyword , vars in sink_args_with_positions .kwargs .items ():
261
+ if sink .trigger .kwarg_propagates (keyword ):
262
+ sink_args .extend (vars )
263
+
264
+ if (
265
+ # Either any unspecified kwarg propagates
266
+ not sink .trigger .kwarg_list_propagates or
267
+ # or there are some propagating kwargs which have not been passed by keyword
268
+ sink .trigger .kwarg_list - set (sink_args_with_positions .kwargs .keys ())
269
+ ):
270
+ sink_args .extend (sink_args_with_positions .unknown_kwargs )
271
+
272
+ return sink_args
273
+
274
+
246
275
def get_vulnerability_chains (
247
276
current_node ,
248
277
sink ,
@@ -377,10 +406,14 @@ def get_vulnerability(
377
406
sink .cfg_node )]
378
407
nodes_in_constaint .append (source .cfg_node )
379
408
380
- sink_args = get_sink_args (sink .cfg_node )
409
+ if sink .trigger .all_arguments_propagate_taint :
410
+ sink_args = get_sink_args (sink .cfg_node )
411
+ else :
412
+ sink_args = get_sink_args_which_propagate (sink , sink .cfg_node .ast_node )
413
+
381
414
tainted_node_in_sink_arg = get_tainted_node_in_sink_args (
382
415
sink_args ,
383
- nodes_in_constaint
416
+ nodes_in_constaint ,
384
417
)
385
418
386
419
if tainted_node_in_sink_arg :
0 commit comments