11
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
+
14
15
import logging
15
16
import re
16
17
from enum import Enum
17
- from typing import TYPE_CHECKING , Iterable , List , Match , Optional
18
+ from typing import TYPE_CHECKING , Dict , Iterable , List , Optional , Pattern
19
+
20
+ import attr
21
+ from netaddr import IPSet
18
22
19
23
from synapse .api .constants import EventTypes
20
24
from synapse .events import EventBase
@@ -33,6 +37,13 @@ class ApplicationServiceState(Enum):
33
37
UP = "up"
34
38
35
39
40
+ @attr .s (slots = True , frozen = True , auto_attribs = True )
41
+ class Namespace :
42
+ exclusive : bool
43
+ group_id : Optional [str ]
44
+ regex : Pattern [str ]
45
+
46
+
36
47
class ApplicationService :
37
48
"""Defines an application service. This definition is mostly what is
38
49
provided to the /register AS API.
@@ -50,17 +61,17 @@ class ApplicationService:
50
61
51
62
def __init__ (
52
63
self ,
53
- token ,
54
- hostname ,
55
- id ,
56
- sender ,
57
- url = None ,
58
- namespaces = None ,
59
- hs_token = None ,
60
- protocols = None ,
61
- rate_limited = True ,
62
- ip_range_whitelist = None ,
63
- supports_ephemeral = False ,
64
+ token : str ,
65
+ hostname : str ,
66
+ id : str ,
67
+ sender : str ,
68
+ url : Optional [ str ] = None ,
69
+ namespaces : Optional [ JsonDict ] = None ,
70
+ hs_token : Optional [ str ] = None ,
71
+ protocols : Optional [ Iterable [ str ]] = None ,
72
+ rate_limited : bool = True ,
73
+ ip_range_whitelist : Optional [ IPSet ] = None ,
74
+ supports_ephemeral : bool = False ,
64
75
):
65
76
self .token = token
66
77
self .url = (
@@ -85,27 +96,33 @@ def __init__(
85
96
86
97
self .rate_limited = rate_limited
87
98
88
- def _check_namespaces (self , namespaces ):
99
+ def _check_namespaces (
100
+ self , namespaces : Optional [JsonDict ]
101
+ ) -> Dict [str , List [Namespace ]]:
89
102
# Sanity check that it is of the form:
90
103
# {
91
104
# users: [ {regex: "[A-z]+.*", exclusive: true}, ...],
92
105
# aliases: [ {regex: "[A-z]+.*", exclusive: true}, ...],
93
106
# rooms: [ {regex: "[A-z]+.*", exclusive: true}, ...],
94
107
# }
95
- if not namespaces :
108
+ if namespaces is None :
96
109
namespaces = {}
97
110
111
+ result : Dict [str , List [Namespace ]] = {}
112
+
98
113
for ns in ApplicationService .NS_LIST :
114
+ result [ns ] = []
115
+
99
116
if ns not in namespaces :
100
- namespaces [ns ] = []
101
117
continue
102
118
103
- if type (namespaces [ns ]) != list :
119
+ if not isinstance (namespaces [ns ], list ) :
104
120
raise ValueError ("Bad namespace value for '%s'" % ns )
105
121
for regex_obj in namespaces [ns ]:
106
122
if not isinstance (regex_obj , dict ):
107
123
raise ValueError ("Expected dict regex for ns '%s'" % ns )
108
- if not isinstance (regex_obj .get ("exclusive" ), bool ):
124
+ exclusive = regex_obj .get ("exclusive" )
125
+ if not isinstance (exclusive , bool ):
109
126
raise ValueError ("Expected bool for 'exclusive' in ns '%s'" % ns )
110
127
group_id = regex_obj .get ("group_id" )
111
128
if group_id :
@@ -126,22 +143,26 @@ def _check_namespaces(self, namespaces):
126
143
)
127
144
128
145
regex = regex_obj .get ("regex" )
129
- if isinstance (regex , str ):
130
- regex_obj ["regex" ] = re .compile (regex ) # Pre-compile regex
131
- else :
146
+ if not isinstance (regex , str ):
132
147
raise ValueError ("Expected string for 'regex' in ns '%s'" % ns )
133
- return namespaces
134
148
135
- def _matches_regex (self , test_string : str , namespace_key : str ) -> Optional [Match ]:
136
- for regex_obj in self .namespaces [namespace_key ]:
137
- if regex_obj ["regex" ].match (test_string ):
138
- return regex_obj
149
+ # Pre-compile regex.
150
+ result [ns ].append (Namespace (exclusive , group_id , re .compile (regex )))
151
+
152
+ return result
153
+
154
+ def _matches_regex (
155
+ self , namespace_key : str , test_string : str
156
+ ) -> Optional [Namespace ]:
157
+ for namespace in self .namespaces [namespace_key ]:
158
+ if namespace .regex .match (test_string ):
159
+ return namespace
139
160
return None
140
161
141
- def _is_exclusive (self , ns_key : str , test_string : str ) -> bool :
142
- regex_obj = self ._matches_regex (test_string , ns_key )
143
- if regex_obj :
144
- return regex_obj [ " exclusive" ]
162
+ def _is_exclusive (self , namespace_key : str , test_string : str ) -> bool :
163
+ namespace = self ._matches_regex (namespace_key , test_string )
164
+ if namespace :
165
+ return namespace . exclusive
145
166
return False
146
167
147
168
async def _matches_user (
@@ -260,15 +281,15 @@ async def is_interested_in_presence(
260
281
261
282
def is_interested_in_user (self , user_id : str ) -> bool :
262
283
return (
263
- bool (self ._matches_regex (user_id , ApplicationService .NS_USERS ))
284
+ bool (self ._matches_regex (ApplicationService .NS_USERS , user_id ))
264
285
or user_id == self .sender
265
286
)
266
287
267
288
def is_interested_in_alias (self , alias : str ) -> bool :
268
- return bool (self ._matches_regex (alias , ApplicationService .NS_ALIASES ))
289
+ return bool (self ._matches_regex (ApplicationService .NS_ALIASES , alias ))
269
290
270
291
def is_interested_in_room (self , room_id : str ) -> bool :
271
- return bool (self ._matches_regex (room_id , ApplicationService .NS_ROOMS ))
292
+ return bool (self ._matches_regex (ApplicationService .NS_ROOMS , room_id ))
272
293
273
294
def is_exclusive_user (self , user_id : str ) -> bool :
274
295
return (
@@ -285,14 +306,14 @@ def is_exclusive_alias(self, alias: str) -> bool:
285
306
def is_exclusive_room (self , room_id : str ) -> bool :
286
307
return self ._is_exclusive (ApplicationService .NS_ROOMS , room_id )
287
308
288
- def get_exclusive_user_regexes (self ):
309
+ def get_exclusive_user_regexes (self ) -> List [ Pattern [ str ]] :
289
310
"""Get the list of regexes used to determine if a user is exclusively
290
311
registered by the AS
291
312
"""
292
313
return [
293
- regex_obj [ " regex" ]
294
- for regex_obj in self .namespaces [ApplicationService .NS_USERS ]
295
- if regex_obj [ " exclusive" ]
314
+ namespace . regex
315
+ for namespace in self .namespaces [ApplicationService .NS_USERS ]
316
+ if namespace . exclusive
296
317
]
297
318
298
319
def get_groups_for_user (self , user_id : str ) -> Iterable [str ]:
@@ -305,15 +326,15 @@ def get_groups_for_user(self, user_id: str) -> Iterable[str]:
305
326
An iterable that yields group_id strings.
306
327
"""
307
328
return (
308
- regex_obj [ " group_id" ]
309
- for regex_obj in self .namespaces [ApplicationService .NS_USERS ]
310
- if " group_id" in regex_obj and regex_obj [ " regex" ] .match (user_id )
329
+ namespace . group_id
330
+ for namespace in self .namespaces [ApplicationService .NS_USERS ]
331
+ if namespace . group_id and namespace . regex .match (user_id )
311
332
)
312
333
313
334
def is_rate_limited (self ) -> bool :
314
335
return self .rate_limited
315
336
316
- def __str__ (self ):
337
+ def __str__ (self ) -> str :
317
338
# copy dictionary and redact token fields so they don't get logged
318
339
dict_copy = self .__dict__ .copy ()
319
340
dict_copy ["token" ] = "<redacted>"
0 commit comments