6
6
7
7
package org .elasticsearch .xpack .transform .integration ;
8
8
9
+ import org .apache .logging .log4j .Level ;
9
10
import org .elasticsearch .action .admin .cluster .node .tasks .list .ListTasksRequest ;
10
11
import org .elasticsearch .action .admin .indices .refresh .RefreshRequest ;
11
12
import org .elasticsearch .action .bulk .BulkRequest ;
12
13
import org .elasticsearch .action .bulk .BulkResponse ;
13
14
import org .elasticsearch .action .index .IndexRequest ;
15
+ import org .elasticsearch .action .search .SearchRequest ;
16
+ import org .elasticsearch .action .search .SearchResponse ;
14
17
import org .elasticsearch .client .RequestOptions ;
15
18
import org .elasticsearch .client .RestHighLevelClient ;
16
19
import org .elasticsearch .client .core .AcknowledgedResponse ;
48
51
import org .elasticsearch .common .xcontent .XContentHelper ;
49
52
import org .elasticsearch .common .xcontent .XContentParser ;
50
53
import org .elasticsearch .common .xcontent .XContentType ;
54
+ import org .elasticsearch .index .query .MatchAllQueryBuilder ;
51
55
import org .elasticsearch .index .query .QueryBuilder ;
52
56
import org .elasticsearch .index .query .QueryBuilders ;
57
+ import org .elasticsearch .search .SearchHit ;
53
58
import org .elasticsearch .search .SearchModule ;
54
59
import org .elasticsearch .search .aggregations .AggregatorFactories ;
55
60
import org .elasticsearch .search .aggregations .bucket .histogram .DateHistogramInterval ;
61
+ import org .elasticsearch .search .builder .SearchSourceBuilder ;
62
+ import org .elasticsearch .search .sort .SortOrder ;
56
63
import org .elasticsearch .test .rest .ESRestTestCase ;
64
+ import org .joda .time .Instant ;
57
65
58
66
import java .io .IOException ;
59
67
import java .nio .charset .StandardCharsets ;
62
70
import java .util .Collections ;
63
71
import java .util .HashMap ;
64
72
import java .util .List ;
73
+ import java .util .Locale ;
65
74
import java .util .Map ;
66
75
import java .util .concurrent .TimeUnit ;
67
76
@@ -73,10 +82,36 @@ abstract class TransformIntegTestCase extends ESRestTestCase {
73
82
private Map <String , TransformConfig > transformConfigs = new HashMap <>();
74
83
75
84
protected void cleanUp () throws IOException {
85
+ logAudits ();
76
86
cleanUpTransforms ();
77
87
waitForPendingTasks ();
78
88
}
79
89
90
+ private void logAudits () throws IOException {
91
+ RestHighLevelClient restClient = new TestRestHighLevelClient ();
92
+
93
+ // using '*' to make this lenient and do not fail if the audit index does not exist
94
+ SearchRequest searchRequest = new SearchRequest (".transform-notifications-*" );
95
+ searchRequest .source (new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()).size (100 ).sort ("timestamp" , SortOrder .ASC ));
96
+
97
+ restClient .indices ().refresh (new RefreshRequest (searchRequest .indices ()), RequestOptions .DEFAULT );
98
+
99
+ SearchResponse searchResponse = restClient .search (searchRequest , RequestOptions .DEFAULT );
100
+
101
+ for (SearchHit hit : searchResponse .getHits ()) {
102
+ Map <String , Object > source = hit .getSourceAsMap ();
103
+ String level = (String ) source .getOrDefault ("level" , "info" );
104
+ logger .log (
105
+ Level .getLevel (level .toUpperCase (Locale .ROOT )),
106
+ "Transform audit: [{}] [{}] [{}] [{}]" ,
107
+ Instant .ofEpochMilli ((long ) source .getOrDefault ("timestamp" , 0 )),
108
+ source .getOrDefault ("transform_id" , "n/a" ),
109
+ source .getOrDefault ("message" , "n/a" ),
110
+ source .getOrDefault ("node_name" , "n/a" )
111
+ );
112
+ }
113
+ }
114
+
80
115
protected void cleanUpTransforms () throws IOException {
81
116
for (TransformConfig config : transformConfigs .values ()) {
82
117
stopTransform (config .getId ());
@@ -89,10 +124,8 @@ protected StopTransformResponse stopTransform(String id) throws IOException {
89
124
return stopTransform (id , true , null , false );
90
125
}
91
126
92
- protected StopTransformResponse stopTransform (String id ,
93
- boolean waitForCompletion ,
94
- TimeValue timeout ,
95
- boolean waitForCheckpoint ) throws IOException {
127
+ protected StopTransformResponse stopTransform (String id , boolean waitForCompletion , TimeValue timeout , boolean waitForCheckpoint )
128
+ throws IOException {
96
129
RestHighLevelClient restClient = new TestRestHighLevelClient ();
97
130
return restClient .transform ()
98
131
.stopTransform (new StopTransformRequest (id , waitForCompletion , timeout , waitForCheckpoint ), RequestOptions .DEFAULT );
@@ -105,8 +138,7 @@ protected StartTransformResponse startTransform(String id, RequestOptions option
105
138
106
139
protected AcknowledgedResponse deleteTransform (String id ) throws IOException {
107
140
RestHighLevelClient restClient = new TestRestHighLevelClient ();
108
- AcknowledgedResponse response =
109
- restClient .transform ().deleteTransform (new DeleteTransformRequest (id ), RequestOptions .DEFAULT );
141
+ AcknowledgedResponse response = restClient .transform ().deleteTransform (new DeleteTransformRequest (id ), RequestOptions .DEFAULT );
110
142
if (response .isAcknowledged ()) {
111
143
transformConfigs .remove (id );
112
144
}
@@ -118,8 +150,7 @@ protected AcknowledgedResponse putTransform(TransformConfig config, RequestOptio
118
150
throw new IllegalArgumentException ("transform [" + config .getId () + "] is already registered" );
119
151
}
120
152
RestHighLevelClient restClient = new TestRestHighLevelClient ();
121
- AcknowledgedResponse response =
122
- restClient .transform ().putTransform (new PutTransformRequest (config ), options );
153
+ AcknowledgedResponse response = restClient .transform ().putTransform (new PutTransformRequest (config ), options );
123
154
if (response .isAcknowledged ()) {
124
155
transformConfigs .put (config .getId (), config );
125
156
}
@@ -141,30 +172,33 @@ protected void waitUntilCheckpoint(String id, long checkpoint) throws Exception
141
172
}
142
173
143
174
protected void waitUntilCheckpoint (String id , long checkpoint , TimeValue waitTime ) throws Exception {
144
- assertBusy (() ->
145
- assertEquals (checkpoint , getTransformStats (id )
146
- .getTransformsStats ()
147
- .get (0 )
148
- .getCheckpointingInfo ()
149
- .getLast ()
150
- .getCheckpoint ()),
175
+ assertBusy (
176
+ () -> assertEquals (
177
+ checkpoint ,
178
+ getTransformStats (id ).getTransformsStats ().get (0 ).getCheckpointingInfo ().getLast ().getCheckpoint ()
179
+ ),
151
180
waitTime .getMillis (),
152
- TimeUnit .MILLISECONDS );
181
+ TimeUnit .MILLISECONDS
182
+ );
153
183
}
154
184
155
- protected DateHistogramGroupSource createDateHistogramGroupSourceWithFixedInterval (String field ,
156
- DateHistogramInterval interval ,
157
- ZoneId zone ) {
185
+ protected DateHistogramGroupSource createDateHistogramGroupSourceWithFixedInterval (
186
+ String field ,
187
+ DateHistogramInterval interval ,
188
+ ZoneId zone
189
+ ) {
158
190
DateHistogramGroupSource .Builder builder = DateHistogramGroupSource .builder ()
159
191
.setField (field )
160
192
.setInterval (new DateHistogramGroupSource .FixedInterval (interval ))
161
193
.setTimeZone (zone );
162
194
return builder .build ();
163
195
}
164
196
165
- protected DateHistogramGroupSource createDateHistogramGroupSourceWithCalendarInterval (String field ,
166
- DateHistogramInterval interval ,
167
- ZoneId zone ) {
197
+ protected DateHistogramGroupSource createDateHistogramGroupSourceWithCalendarInterval (
198
+ String field ,
199
+ DateHistogramInterval interval ,
200
+ ZoneId zone
201
+ ) {
168
202
DateHistogramGroupSource .Builder builder = DateHistogramGroupSource .builder ()
169
203
.setField (field )
170
204
.setInterval (new DateHistogramGroupSource .CalendarInterval (interval ))
@@ -188,35 +222,38 @@ protected AggregationConfig createAggConfig(AggregatorFactories.Builder aggregat
188
222
return new AggregationConfig (aggregations );
189
223
}
190
224
191
- protected PivotConfig createPivotConfig (Map <String , SingleGroupSource > groups ,
192
- AggregatorFactories . Builder aggregations ) throws Exception {
225
+ protected PivotConfig createPivotConfig (Map <String , SingleGroupSource > groups , AggregatorFactories . Builder aggregations )
226
+ throws Exception {
193
227
return createPivotConfig (groups , aggregations , null );
194
228
}
195
229
196
- protected PivotConfig createPivotConfig (Map <String , SingleGroupSource > groups ,
197
- AggregatorFactories .Builder aggregations ,
198
- Integer size ) throws Exception {
230
+ protected PivotConfig createPivotConfig (Map <String , SingleGroupSource > groups , AggregatorFactories .Builder aggregations , Integer size )
231
+ throws Exception {
199
232
PivotConfig .Builder builder = PivotConfig .builder ()
200
233
.setGroups (createGroupConfig (groups ))
201
234
.setAggregationConfig (createAggConfig (aggregations ))
202
235
.setMaxPageSearchSize (size );
203
236
return builder .build ();
204
237
}
205
238
206
- protected TransformConfig createTransformConfig (String id ,
207
- Map <String , SingleGroupSource > groups ,
208
- AggregatorFactories .Builder aggregations ,
209
- String destinationIndex ,
210
- String ... sourceIndices ) throws Exception {
239
+ protected TransformConfig createTransformConfig (
240
+ String id ,
241
+ Map <String , SingleGroupSource > groups ,
242
+ AggregatorFactories .Builder aggregations ,
243
+ String destinationIndex ,
244
+ String ... sourceIndices
245
+ ) throws Exception {
211
246
return createTransformConfig (id , groups , aggregations , destinationIndex , QueryBuilders .matchAllQuery (), sourceIndices );
212
247
}
213
248
214
- protected TransformConfig .Builder createTransformConfigBuilder (String id ,
215
- Map <String , SingleGroupSource > groups ,
216
- AggregatorFactories .Builder aggregations ,
217
- String destinationIndex ,
218
- QueryBuilder queryBuilder ,
219
- String ... sourceIndices ) throws Exception {
249
+ protected TransformConfig .Builder createTransformConfigBuilder (
250
+ String id ,
251
+ Map <String , SingleGroupSource > groups ,
252
+ AggregatorFactories .Builder aggregations ,
253
+ String destinationIndex ,
254
+ QueryBuilder queryBuilder ,
255
+ String ... sourceIndices
256
+ ) throws Exception {
220
257
return TransformConfig .builder ()
221
258
.setId (id )
222
259
.setSource (SourceConfig .builder ().setIndex (sourceIndices ).setQueryConfig (createQueryConfig (queryBuilder )).build ())
@@ -226,12 +263,14 @@ protected TransformConfig.Builder createTransformConfigBuilder(String id,
226
263
.setDescription ("Test transform config id: " + id );
227
264
}
228
265
229
- protected TransformConfig createTransformConfig (String id ,
230
- Map <String , SingleGroupSource > groups ,
231
- AggregatorFactories .Builder aggregations ,
232
- String destinationIndex ,
233
- QueryBuilder queryBuilder ,
234
- String ... sourceIndices ) throws Exception {
266
+ protected TransformConfig createTransformConfig (
267
+ String id ,
268
+ Map <String , SingleGroupSource > groups ,
269
+ AggregatorFactories .Builder aggregations ,
270
+ String destinationIndex ,
271
+ QueryBuilder queryBuilder ,
272
+ String ... sourceIndices
273
+ ) throws Exception {
235
274
return createTransformConfigBuilder (id , groups , aggregations , destinationIndex , queryBuilder , sourceIndices ).build ();
236
275
}
237
276
@@ -272,8 +311,8 @@ protected void createReviewsIndex(String indexName, int numDocs) throws Exceptio
272
311
.endObject ();
273
312
}
274
313
builder .endObject ();
275
- CreateIndexResponse response =
276
- restClient . indices () .create (new CreateIndexRequest (indexName ).mapping (builder ), RequestOptions .DEFAULT );
314
+ CreateIndexResponse response = restClient . indices ()
315
+ .create (new CreateIndexRequest (indexName ).mapping (builder ), RequestOptions .DEFAULT );
277
316
assertThat (response .isAcknowledged (), is (true ));
278
317
}
279
318
@@ -320,10 +359,14 @@ protected void createReviewsIndex(String indexName, int numDocs) throws Exceptio
320
359
321
360
protected Map <String , Object > toLazy (ToXContent parsedObject ) throws Exception {
322
361
BytesReference bytes = XContentHelper .toXContent (parsedObject , XContentType .JSON , false );
323
- try (XContentParser parser = XContentHelper .createParser (xContentRegistry (),
324
- DeprecationHandler .THROW_UNSUPPORTED_OPERATION ,
325
- bytes ,
326
- XContentType .JSON )) {
362
+ try (
363
+ XContentParser parser = XContentHelper .createParser (
364
+ xContentRegistry (),
365
+ DeprecationHandler .THROW_UNSUPPORTED_OPERATION ,
366
+ bytes ,
367
+ XContentType .JSON
368
+ )
369
+ ) {
327
370
return parser .mapOrdered ();
328
371
}
329
372
}
@@ -349,16 +392,15 @@ protected NamedXContentRegistry xContentRegistry() {
349
392
350
393
@ Override
351
394
protected Settings restClientSettings () {
352
- final String token = "Basic " +
353
- Base64 .getEncoder ().encodeToString (("x_pack_rest_user:x-pack-test-password" ).getBytes (StandardCharsets .UTF_8 ));
354
- return Settings .builder ()
355
- .put (ThreadContext .PREFIX + ".Authorization" , token )
356
- .build ();
395
+ final String token = "Basic "
396
+ + Base64 .getEncoder ().encodeToString (("x_pack_rest_user:x-pack-test-password" ).getBytes (StandardCharsets .UTF_8 ));
397
+ return Settings .builder ().put (ThreadContext .PREFIX + ".Authorization" , token ).build ();
357
398
}
358
399
359
400
protected static class TestRestHighLevelClient extends RestHighLevelClient {
360
- private static final List <NamedXContentRegistry .Entry > X_CONTENT_ENTRIES =
361
- new SearchModule (Settings .EMPTY , Collections .emptyList ()).getNamedXContents ();
401
+ private static final List <NamedXContentRegistry .Entry > X_CONTENT_ENTRIES = new SearchModule (Settings .EMPTY , Collections .emptyList ())
402
+ .getNamedXContents ();
403
+
362
404
TestRestHighLevelClient () {
363
405
super (client (), restClient -> {}, X_CONTENT_ENTRIES );
364
406
}
0 commit comments