15
15
16
16
from os import environ
17
17
from os .path import basename
18
- from time import sleep , time
18
+ import threading
19
+ import time
20
+ import uuid
19
21
20
22
from google .api_core .exceptions import AlreadyExists
21
23
from google .api_core .exceptions import InvalidArgument
35
37
TRY_LIMIT = 20
36
38
37
39
40
+ class MessageReceiver :
41
+ """Custom class to handle incoming Pub/Sub messages."""
42
+ def __init__ (self , expected_msg_nums , done_event ):
43
+ # initialize counter to 0 on initialization
44
+ self .msg_count = 0
45
+ self .expected_msg_nums = expected_msg_nums
46
+ self .done_event = done_event
47
+
48
+ def pubsub_callback (self , message ):
49
+ # every time a pubsub message comes in, print it and count it
50
+ self .msg_count += 1
51
+ print ('Message {}: {}' .format (self .msg_count , message .data ))
52
+ message .ack ()
53
+ if (self .msg_count == self .expected_msg_nums ):
54
+ self .done_event .set ()
55
+
56
+
38
57
class TestContainerAnalysisSamples :
39
58
40
59
def setup_method (self , test_method ):
41
60
print ('SETUP {}' .format (test_method .__name__ ))
42
- timestamp = str (int (time ()))
43
- self .note_id = 'note-{}-{}' .format (timestamp , test_method .__name__ )
44
- self .image_url = '{}.{}' .format (timestamp , test_method .__name__ )
61
+ self .note_id = 'note-{}' .format (uuid .uuid4 ())
62
+ self .image_url = '{}.{}' .format (uuid .uuid4 (), test_method .__name__ )
45
63
self .note_obj = samples .create_note (self .note_id , PROJECT_ID )
46
64
47
65
def teardown_method (self , test_method ):
@@ -102,7 +120,7 @@ def test_occurrences_for_image(self):
102
120
tries += 1
103
121
new_count = samples .get_occurrences_for_image (self .image_url ,
104
122
PROJECT_ID )
105
- sleep (SLEEP_TIME )
123
+ time . sleep (SLEEP_TIME )
106
124
assert new_count == 1
107
125
assert orig_count == 0
108
126
# clean up
@@ -121,7 +139,7 @@ def test_occurrences_for_note(self):
121
139
tries += 1
122
140
new_count = samples .get_occurrences_for_note (self .note_id ,
123
141
PROJECT_ID )
124
- sleep (SLEEP_TIME )
142
+ time . sleep (SLEEP_TIME )
125
143
assert new_count == 1
126
144
assert orig_count == 0
127
145
# clean up
@@ -138,33 +156,31 @@ def test_pubsub(self):
138
156
except AlreadyExists :
139
157
pass
140
158
141
- subscription_id = 'drydockOccurrences'
159
+ subscription_id = 'container-analysis-test-{}' . format ( uuid . uuid4 ())
142
160
subscription_name = client .subscription_path (PROJECT_ID ,
143
161
subscription_id )
144
162
samples .create_occurrence_subscription (subscription_id , PROJECT_ID )
145
- tries = 0
146
- success = False
147
- while not success and tries < TRY_LIMIT :
148
- print (tries )
149
- tries += 1
150
- receiver = samples .MessageReceiver ()
163
+
164
+ # I can not make it pass with multiple messages. My guess is
165
+ # the server started to dedup?
166
+ message_count = 1
167
+ try :
168
+ job_done = threading .Event ()
169
+ receiver = MessageReceiver (message_count , job_done )
151
170
client .subscribe (subscription_name , receiver .pubsub_callback )
152
171
153
- # test adding 3 more occurrences
154
- total_created = 3
155
- for _ in range (total_created ):
156
- occ = samples .create_occurrence (self .image_url ,
157
- self .note_id ,
158
- PROJECT_ID ,
159
- PROJECT_ID )
160
- sleep (SLEEP_TIME )
172
+ for i in range (message_count ):
173
+ occ = samples .create_occurrence (
174
+ self .image_url , self .note_id , PROJECT_ID , PROJECT_ID )
175
+ time .sleep (SLEEP_TIME )
161
176
samples .delete_occurrence (basename (occ .name ), PROJECT_ID )
162
- sleep (SLEEP_TIME )
177
+ time .sleep (SLEEP_TIME )
178
+ job_done .wait (timeout = 60 )
163
179
print ('done. msg_count = {}' .format (receiver .msg_count ))
164
- success = receiver .msg_count == total_created
165
- assert receiver . msg_count == total_created
166
- # clean up
167
- client .delete_subscription (subscription_name )
180
+ assert message_count < = receiver .msg_count
181
+ finally :
182
+ # clean up
183
+ client .delete_subscription (subscription_name )
168
184
169
185
def test_poll_discovery_occurrence (self ):
170
186
# try with no discovery occurrence
@@ -177,7 +193,7 @@ def test_poll_discovery_occurrence(self):
177
193
assert False
178
194
179
195
# create discovery occurrence
180
- note_id = 'discovery-note-{}' .format (int ( time () ))
196
+ note_id = 'discovery-note-{}' .format (uuid . uuid4 ( ))
181
197
client = containeranalysis_v1 .ContainerAnalysisClient ()
182
198
grafeas_client = client .get_grafeas_client ()
183
199
note = {
@@ -225,7 +241,7 @@ def test_find_vulnerabilities_for_image(self):
225
241
occ_list = samples .find_vulnerabilities_for_image (self .image_url ,
226
242
PROJECT_ID )
227
243
count = len (occ_list )
228
- sleep (SLEEP_TIME )
244
+ time . sleep (SLEEP_TIME )
229
245
assert len (occ_list ) == 1
230
246
samples .delete_occurrence (basename (created .name ), PROJECT_ID )
231
247
@@ -236,7 +252,7 @@ def test_find_high_severity_vulnerabilities(self):
236
252
assert len (occ_list ) == 0
237
253
238
254
# create new high severity vulnerability
239
- note_id = 'discovery-note-{}' .format (int ( time () ))
255
+ note_id = 'discovery-note-{}' .format (uuid . uuid4 ( ))
240
256
client = containeranalysis_v1 .ContainerAnalysisClient ()
241
257
grafeas_client = client .get_grafeas_client ()
242
258
note = {
@@ -287,7 +303,7 @@ def test_find_high_severity_vulnerabilities(self):
287
303
occ_list = samples .find_vulnerabilities_for_image (self .image_url ,
288
304
PROJECT_ID )
289
305
count = len (occ_list )
290
- sleep (SLEEP_TIME )
306
+ time . sleep (SLEEP_TIME )
291
307
assert len (occ_list ) == 1
292
308
# clean up
293
309
samples .delete_occurrence (basename (created .name ), PROJECT_ID )
0 commit comments