3
3
import os
4
4
import re
5
5
import shutil
6
+ import unittest
6
7
from time import gmtime , strftime
7
8
from urlparse import urlparse
8
9
14
15
15
16
sys .path .append (os .path .join (os .path .dirname (__file__ ), '..' ,
16
17
'..' , '_beats' , 'libbeat' , 'tests' , 'system' ))
17
- from beat .beat import TestCase , TimeoutError
18
+ from beat .beat import INTEGRATION_TESTS , TestCase , TimeoutError
18
19
20
+ integration_test = unittest .skipUnless (INTEGRATION_TESTS , "integration test" )
19
21
20
22
class BaseTest (TestCase ):
21
23
maxDiff = None
@@ -105,11 +107,26 @@ def setUp(self):
105
107
self .apmserver_proc = self .start_beat (** self .start_args ())
106
108
self .wait_until_started ()
107
109
110
+ # try make sure APM Server is fully up
111
+ cfg = self .config ()
112
+ # pipeline registration is enabled by default and only happens if the output is elasticsearch
113
+ if not getattr (self , "register_pipeline_disabled" , False ) and \
114
+ cfg .get ("elasticsearch_host" ) and \
115
+ cfg .get ("register_pipeline_enabled" ) != "false" and cfg .get ("register_pipeline_overwrite" ) != "false" :
116
+ self .wait_until_pipelines_registered ()
117
+
108
118
def start_args (self ):
109
119
return {}
110
120
111
121
def wait_until_started (self ):
112
- self .wait_until (lambda : self .log_contains ("Starting apm-server" ))
122
+ self .wait_until (lambda : self .log_contains ("Starting apm-server" ), name = "apm-server started" )
123
+
124
+ def wait_until_ilm_setup (self ):
125
+ self .wait_until (lambda : self .log_contains ("Finished index management setup." ), name = "ILM setup" )
126
+
127
+ def wait_until_pipelines_registered (self ):
128
+ self .wait_until (lambda : self .log_contains ("Registered Ingest Pipelines successfully" ),
129
+ name = "pipelines registered" )
113
130
114
131
def assert_no_logged_warnings (self , suppress = None ):
115
132
"""
@@ -130,11 +147,13 @@ def assert_no_logged_warnings(self, suppress=None):
130
147
log = re .sub (s , "" , log )
131
148
self .assertNotRegexpMatches (log , "ERR|WARN" )
132
149
133
- def request_intake (self , data = "" , url = "" , headers = { 'content-type' : 'application/x-ndjson' } ):
134
- if url == "" :
150
+ def request_intake (self , data = None , url = None , headers = None ):
151
+ if not url :
135
152
url = self .intake_url
136
- if data == "" :
153
+ if data is None :
137
154
data = self .get_event_payload ()
155
+ if headers is None :
156
+ headers = {'content-type' : 'application/x-ndjson' }
138
157
return requests .post (url , data = data , headers = headers )
139
158
140
159
@@ -181,20 +200,28 @@ def setUp(self):
181
200
self .kibana_url = self .get_kibana_url ()
182
201
183
202
# Cleanup index and template first
184
- self .es .indices .delete (index = "apm*" , ignore = [400 , 404 ])
185
- for idx in self .indices :
186
- self .wait_until (lambda : not self .es .indices .exists (idx ))
187
-
188
- self .es .indices .delete_template (name = "apm*" , ignore = [400 , 404 ])
189
- for idx in self .indices :
190
- self .wait_until (lambda : not self .es .indices .exists_template (idx ))
203
+ assert all (idx .startswith ("apm" ) for idx in self .indices ), "not all indices prefixed with apm, cleanup assumption broken"
204
+ if self .es .indices .get ("apm*" ):
205
+ self .es .indices .delete (index = "apm*" , ignore = [400 , 404 ])
206
+ for idx in self .indices :
207
+ self .wait_until (lambda : not self .es .indices .exists (idx ), name = "index {} to be deleted" .format (idx ))
208
+
209
+ if self .es .indices .get_template (name = "apm*" , ignore = [400 , 404 ]):
210
+ self .es .indices .delete_template (name = "apm*" , ignore = [400 , 404 ])
211
+ for idx in self .indices :
212
+ self .wait_until (lambda : not self .es .indices .exists_template (idx ),
213
+ name = "index template {} to be deleted" .format (idx ))
191
214
192
215
# truncate, don't delete agent configuration index since it's only created when kibana starts up
193
- self .es .delete_by_query (self .index_acm , {"query" : {"match_all" : {}}},
194
- ignore_unavailable = True , wait_for_completion = True )
195
- self .wait_until (lambda : self .es .count (index = self .index_acm , ignore_unavailable = True )["count" ] == 0 )
216
+ if self .es .count (index = self .index_acm , ignore_unavailable = True )["count" ] > 0 :
217
+ self .es .delete_by_query (self .index_acm , {"query" : {"match_all" : {}}},
218
+ ignore_unavailable = True , wait_for_completion = True )
219
+ self .wait_until (lambda : self .es .count (index = self .index_acm , ignore_unavailable = True )["count" ] == 0 ,
220
+ max_timeout = 30 , name = "acm index {} to be empty" .format (self .index_acm ))
221
+
196
222
# Cleanup pipelines
197
- self .es .ingest .delete_pipeline (id = "*" )
223
+ if self .es .ingest .get_pipeline (ignore = [400 , 404 ]):
224
+ self .es .ingest .delete_pipeline (id = "*" )
198
225
199
226
super (ElasticTest , self ).setUp ()
200
227
@@ -219,7 +246,8 @@ def load_docs_with_template(self, data_path, url, endpoint, expected_events_coun
219
246
lambda : (self .es .count (index = query_index , body = {
220
247
"query" : {"term" : {"processor.name" : endpoint }}}
221
248
)['count' ] == expected_events_count ),
222
- max_timeout = max_timeout
249
+ max_timeout = max_timeout ,
250
+ name = "{} documents to reach {}" .format (endpoint , expected_events_count ),
223
251
)
224
252
225
253
def check_backend_error_sourcemap (self , index , count = 1 ):
@@ -311,7 +339,8 @@ def wait_for_sourcemaps(self, expected_ct=1):
311
339
self .wait_until (
312
340
lambda : (self .es .count (index = idx , body = {
313
341
"query" : {"term" : {"processor.name" : 'sourcemap' }}}
314
- )['count' ] == expected_ct )
342
+ )['count' ] == expected_ct ),
343
+ name = "{} sourcemaps to ingest" .format (expected_ct ),
315
344
)
316
345
317
346
def check_rum_error_sourcemap (self , updated , expected_err = None , count = 1 ):
0 commit comments