42
42
ARGON2_PARAMS = dict (memory_cost = 2 ** 16 , rounds = 4 , parallelism = 2 )
43
43
44
44
45
- def get_one_or_else (query : Query ,
46
- logger : Logger ,
47
- failure_method : Callable [[int ], None ]) -> None :
45
+ def get_one_or_else (query : ' Query' ,
46
+ logger : ' Logger' ,
47
+ failure_method : ' Callable[[int], None]' ) -> None :
48
48
try :
49
49
return query .one ()
50
50
except MultipleResultsFound as e :
@@ -97,7 +97,7 @@ def journalist_filename(self) -> str:
97
97
return '' .join ([c for c in self .journalist_designation .lower ().replace (
98
98
' ' , '_' ) if c in valid_chars ])
99
99
100
- def documents_messages_count (self ) -> Dict [str , int ]:
100
+ def documents_messages_count (self ) -> ' Dict[str, int]' :
101
101
self .docs_msgs_count = {'messages' : 0 , 'documents' : 0 }
102
102
for submission in self .submissions :
103
103
if submission .filename .endswith ('msg.gpg' ):
@@ -108,7 +108,7 @@ def documents_messages_count(self) -> Dict[str, int]:
108
108
return self .docs_msgs_count
109
109
110
110
@property
111
- def collection (self ) -> List [Union [Submission , Reply ]]:
111
+ def collection (self ) -> ' List[Union[Submission, Reply]]' :
112
112
"""Return the list of submissions and replies for this source, sorted
113
113
in ascending order by the filename/interaction count."""
114
114
collection = [] # type: List[Union[Submission, Reply]]
@@ -141,7 +141,7 @@ def public_key(self, value: str) -> None:
141
141
def public_key (self ) -> None :
142
142
raise NotImplementedError
143
143
144
- def to_json (self ) -> Dict [str , Union [str , bool , int , str ]]:
144
+ def to_json (self ) -> ' Dict[str, Union[str, bool, int, str]]' :
145
145
docs_msg_count = self .documents_messages_count ()
146
146
147
147
if self .last_updated :
@@ -212,7 +212,7 @@ def __init__(self, source: Source, filename: str) -> None:
212
212
def __repr__ (self ) -> str :
213
213
return '<Submission %r>' % (self .filename )
214
214
215
- def to_json (self ) -> Dict [str , Union [str , int , bool ]]:
215
+ def to_json (self ) -> ' Dict[str, Union[str, int, bool]]' :
216
216
json_submission = {
217
217
'source_url' : url_for ('api.single_source' ,
218
218
source_uuid = self .source .uuid ),
@@ -260,7 +260,7 @@ class Reply(db.Model):
260
260
deleted_by_source = Column (Boolean , default = False , nullable = False )
261
261
262
262
def __init__ (self ,
263
- journalist : Journalist ,
263
+ journalist : ' Journalist' ,
264
264
source : Source ,
265
265
filename : str ) -> None :
266
266
self .journalist_id = journalist .id
@@ -273,7 +273,7 @@ def __init__(self,
273
273
def __repr__ (self ) -> str :
274
274
return '<Reply %r>' % (self .filename )
275
275
276
- def to_json (self ) -> Dict [str , Union [str , int , bool ]]:
276
+ def to_json (self ) -> ' Dict[str, Union[str, int, bool]]' :
277
277
username = "deleted"
278
278
first_name = ""
279
279
last_name = ""
@@ -307,7 +307,7 @@ class SourceStar(db.Model):
307
307
source_id = Column ("source_id" , Integer , ForeignKey ('sources.id' ))
308
308
starred = Column ("starred" , Boolean , default = True )
309
309
310
- def __eq__ (self , other : Any ) -> bool :
310
+ def __eq__ (self , other : ' Any' ) -> bool :
311
311
if isinstance (other , SourceStar ):
312
312
return (self .source_id == other .source_id and
313
313
self .id == other .id and self .starred == other .starred )
@@ -418,10 +418,10 @@ class Journalist(db.Model):
418
418
def __init__ (self ,
419
419
username : str ,
420
420
password : str ,
421
- first_name : Optional [str ] = None ,
422
- last_name : Optional [str ] = None ,
421
+ first_name : ' Optional[str]' = None ,
422
+ last_name : ' Optional[str]' = None ,
423
423
is_admin : bool = False ,
424
- otp_secret : Optional [str ] = None ) -> None :
424
+ otp_secret : ' Optional[str]' = None ) -> None :
425
425
426
426
self .check_username_acceptable (username )
427
427
self .username = username
@@ -547,14 +547,14 @@ def set_hotp_secret(self, otp_secret: str) -> None:
547
547
self .hotp_counter = 0
548
548
549
549
@property
550
- def totp (self ) -> OTP :
550
+ def totp (self ) -> ' OTP' :
551
551
if self .is_totp :
552
552
return pyotp .TOTP (self .otp_secret )
553
553
else :
554
554
raise ValueError ('{} is not using TOTP' .format (self ))
555
555
556
556
@property
557
- def hotp (self ) -> OTP :
557
+ def hotp (self ) -> ' OTP' :
558
558
if not self .is_totp :
559
559
return pyotp .HOTP (self .otp_secret )
560
560
else :
@@ -677,7 +677,7 @@ def validate_token_is_not_expired_or_invalid(token):
677
677
return True
678
678
679
679
@staticmethod
680
- def validate_api_token_and_get_user (token : str ) -> Union [Journalist , None ]:
680
+ def validate_api_token_and_get_user (token : str ) -> ' Union[Journalist, None]' :
681
681
s = TimedJSONWebSignatureSerializer (current_app .config ['SECRET_KEY' ])
682
682
try :
683
683
data = s .loads (token )
@@ -690,7 +690,7 @@ def validate_api_token_and_get_user(token: str) -> Union[Journalist, None]:
690
690
691
691
return Journalist .query .get (data ['id' ])
692
692
693
- def to_json (self ) -> Dict [str , Union [str , bool , str ]]:
693
+ def to_json (self ) -> ' Dict[str, Union[str, bool, str]]' :
694
694
json_user = {
695
695
'username' : self .username ,
696
696
'last_login' : self .last_access .isoformat () + 'Z' ,
0 commit comments