@@ -246,3 +246,120 @@ async fn test_sqs_garbage_collect() {
246
246
247
247
sandbox. shutdown ( ) . await . unwrap ( ) ;
248
248
}
249
+
250
+ // this source update test is done here because SQS is the only long running
251
+ // configurable source for which we have integration tests set up.
252
+ #[ tokio:: test]
253
+ async fn test_update_source_multi_node_cluster ( ) {
254
+ quickwit_common:: setup_logging_for_tests ( ) ;
255
+ let index_id = "test-update-source-cluster" ;
256
+ let sqs_client = sqs_test_helpers:: get_localstack_sqs_client ( ) . await . unwrap ( ) ;
257
+ let queue_url = sqs_test_helpers:: create_queue ( & sqs_client, "test-update-source-cluster" ) . await ;
258
+
259
+ let sandbox = ClusterSandboxBuilder :: default ( )
260
+ . add_node ( [ QuickwitService :: Searcher ] )
261
+ . add_node ( [ QuickwitService :: Metastore ] )
262
+ . add_node ( [ QuickwitService :: Indexer ] )
263
+ . add_node ( [ QuickwitService :: ControlPlane ] )
264
+ . add_node ( [ QuickwitService :: Janitor ] )
265
+ . build_and_start ( )
266
+ . await ;
267
+
268
+ {
269
+ // Wait for indexer to fully start.
270
+ // The starting time is a bit long for a cluster.
271
+ tokio:: time:: sleep ( Duration :: from_secs ( 3 ) ) . await ;
272
+ let indexing_service_counters = sandbox
273
+ . rest_client ( QuickwitService :: Indexer )
274
+ . node_stats ( )
275
+ . indexing ( )
276
+ . await
277
+ . unwrap ( ) ;
278
+ assert_eq ! ( indexing_service_counters. num_running_pipelines, 0 ) ;
279
+ }
280
+
281
+ // Create an index
282
+ let index_config = format ! (
283
+ r#"
284
+ version: 0.8
285
+ index_id: {}
286
+ doc_mapping:
287
+ field_mappings:
288
+ - name: body
289
+ type: text
290
+ indexing_settings:
291
+ commit_timeout_secs: 1
292
+ "# ,
293
+ index_id
294
+ ) ;
295
+ sandbox
296
+ . rest_client ( QuickwitService :: Indexer )
297
+ . indexes ( )
298
+ . create ( index_config, ConfigFormat :: Yaml , false )
299
+ . await
300
+ . unwrap ( ) ;
301
+
302
+ // Wait until indexing pipelines are started
303
+ sandbox. wait_for_indexing_pipelines ( 1 ) . await . unwrap ( ) ;
304
+
305
+ // create an SQS source with 1 pipeline
306
+ let source_id: & str = "test-update-source-cluster" ;
307
+ let source_config_input = format ! (
308
+ r#"
309
+ version: 0.7
310
+ source_id: {}
311
+ desired_num_pipelines: 1
312
+ max_num_pipelines_per_indexer: 1
313
+ source_type: file
314
+ params:
315
+ notifications:
316
+ - type: sqs
317
+ queue_url: {}
318
+ message_type: raw_uri
319
+ deduplication_window_max_messages: 5
320
+ deduplication_cleanup_interval_secs: 3
321
+ input_format: plain_text
322
+ "# ,
323
+ source_id, queue_url
324
+ ) ;
325
+ sandbox
326
+ . rest_client ( QuickwitService :: Indexer )
327
+ . sources ( index_id)
328
+ . create ( source_config_input, ConfigFormat :: Yaml )
329
+ . await
330
+ . unwrap ( ) ;
331
+
332
+ // Wait until the SQS indexing pipeline is also started
333
+ sandbox. wait_for_indexing_pipelines ( 2 ) . await . unwrap ( ) ;
334
+
335
+ // increase the number of pipelines to 3
336
+ let source_config_input = format ! (
337
+ r#"
338
+ version: 0.7
339
+ source_id: {}
340
+ desired_num_pipelines: 3
341
+ max_num_pipelines_per_indexer: 3
342
+ source_type: file
343
+ params:
344
+ notifications:
345
+ - type: sqs
346
+ queue_url: {}
347
+ message_type: raw_uri
348
+ deduplication_window_max_messages: 5
349
+ deduplication_cleanup_interval_secs: 3
350
+ input_format: plain_text
351
+ "# ,
352
+ source_id, queue_url
353
+ ) ;
354
+ sandbox
355
+ . rest_client ( QuickwitService :: Metastore )
356
+ . sources ( index_id)
357
+ . update ( source_id, source_config_input, ConfigFormat :: Yaml )
358
+ . await
359
+ . unwrap ( ) ;
360
+
361
+ // Wait until the SQS indexing pipeline is also started
362
+ sandbox. wait_for_indexing_pipelines ( 4 ) . await . unwrap ( ) ;
363
+
364
+ sandbox. shutdown ( ) . await . unwrap ( ) ;
365
+ }
0 commit comments