Skip to content

Commit abf7723

Browse files
committed
Add unit tests for metastore and control plane
Also include some comment and naming improvments.
1 parent d008f8e commit abf7723

File tree

12 files changed

+252
-32
lines changed

12 files changed

+252
-32
lines changed

quickwit/quickwit-config/src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ pub trait TestableForRegression: Serialize + DeserializeOwned {
288288
fn assert_equality(&self, other: &Self);
289289
}
290290

291-
pub fn indexing_params_fingerprint(
291+
/// Return a fingerprint of all parameters that should trigger an indexing pipeline restart.
292+
pub fn indexing_pipeline_params_fingerprint(
292293
index_config: &IndexConfig,
293294
source_config: &SourceConfig,
294295
) -> u64 {

quickwit/quickwit-control-plane/src/control_plane.rs

+71-2
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,9 @@ mod tests {
11241124
use mockall::Sequence;
11251125
use quickwit_actors::{AskError, Observe, SupervisorMetrics};
11261126
use quickwit_cluster::ClusterChangeStreamFactoryForTest;
1127-
use quickwit_config::{IndexConfig, SourceParams, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID};
1127+
use quickwit_config::{
1128+
IndexConfig, SourceParams, TransformConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID,
1129+
};
11281130
use quickwit_indexing::IndexingService;
11291131
use quickwit_metastore::{
11301132
CreateIndexRequestExt, IndexMetadata, ListIndexesMetadataResponseExt,
@@ -1273,7 +1275,8 @@ mod tests {
12731275
assert_eq!(source_config.source_type(), SourceType::Void);
12741276
true
12751277
})
1276-
.returning(|_| Ok(EmptyResponse {}));
1278+
.return_once(|_| Ok(EmptyResponse {}));
1279+
// the list_indexes_metadata and list_shards calls are made when the control plane starts
12771280
mock_metastore
12781281
.expect_list_indexes_metadata()
12791282
.return_once(move |_| {
@@ -1312,6 +1315,72 @@ mod tests {
13121315
universe.assert_quit().await;
13131316
}
13141317

1318+
#[tokio::test]
1319+
async fn test_control_plane_update_source() {
1320+
let universe = Universe::with_accelerated_time();
1321+
let self_node_id: NodeId = "test-node".into();
1322+
let indexer_pool = IndexerPool::default();
1323+
let ingester_pool = IngesterPool::default();
1324+
1325+
let mut index_metadata = IndexMetadata::for_test("test-index", "ram://tata");
1326+
index_metadata
1327+
.add_source(SourceConfig::ingest_v2())
1328+
.unwrap();
1329+
1330+
let test_source_config = SourceConfig::for_test("test-source", SourceParams::void());
1331+
index_metadata.add_source(test_source_config).unwrap();
1332+
1333+
let mut mock_metastore = MockMetastoreService::new();
1334+
mock_metastore
1335+
.expect_update_source()
1336+
.withf(|update_source_request| {
1337+
let source_config: SourceConfig =
1338+
serde_json::from_str(&update_source_request.source_config_json).unwrap();
1339+
assert_eq!(source_config.source_id, "test-source");
1340+
assert_eq!(source_config.source_type(), SourceType::Void);
1341+
assert!(source_config.transform_config.is_some());
1342+
true
1343+
})
1344+
.return_once(|_| Ok(EmptyResponse {}));
1345+
// the list_indexes_metadata and list_shards calls are made when the control plane starts
1346+
mock_metastore
1347+
.expect_list_indexes_metadata()
1348+
.return_once(move |_| {
1349+
Ok(ListIndexesMetadataResponse::for_test(vec![
1350+
index_metadata.clone()
1351+
]))
1352+
});
1353+
mock_metastore
1354+
.expect_list_shards()
1355+
.return_once(move |_| Ok(ListShardsResponse::default()));
1356+
1357+
let cluster_config = ClusterConfig::for_test();
1358+
let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default();
1359+
let (control_plane_mailbox, _control_plane_handle, _readiness_rx) = ControlPlane::spawn(
1360+
&universe,
1361+
cluster_config,
1362+
self_node_id,
1363+
cluster_change_stream_factory,
1364+
indexer_pool,
1365+
ingester_pool,
1366+
MetastoreServiceClient::from_mock(mock_metastore),
1367+
);
1368+
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
1369+
let mut updated_source_config = SourceConfig::for_test("test-source", SourceParams::void());
1370+
updated_source_config.transform_config =
1371+
Some(TransformConfig::new("del(.username)".to_string(), None));
1372+
let update_source_request = UpdateSourceRequest {
1373+
index_uid: Some(index_uid),
1374+
source_config_json: serde_json::to_string(&updated_source_config).unwrap(),
1375+
};
1376+
control_plane_mailbox
1377+
.ask_for_res(update_source_request)
1378+
.await
1379+
.unwrap();
1380+
1381+
universe.assert_quit().await;
1382+
}
1383+
13151384
#[tokio::test]
13161385
async fn test_control_plane_toggle_source() {
13171386
let universe = Universe::with_accelerated_time();

quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use fnv::{FnvHashMap, FnvHashSet};
3030
use itertools::Itertools;
3131
use once_cell::sync::OnceCell;
3232
use quickwit_common::pretty::PrettySample;
33-
use quickwit_config::{indexing_params_fingerprint, FileSourceParams, SourceParams};
33+
use quickwit_config::{indexing_pipeline_params_fingerprint, FileSourceParams, SourceParams};
3434
use quickwit_proto::indexing::{
3535
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
3636
PIPELINE_THROUGHPUT,
@@ -170,7 +170,9 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
170170
}
171171
let params_fingerprint = model
172172
.index_metadata(&source_uid.index_uid)
173-
.map(|index_meta| indexing_params_fingerprint(&index_meta.index_config, source_config))
173+
.map(|index_meta| {
174+
indexing_pipeline_params_fingerprint(&index_meta.index_config, source_config)
175+
})
174176
.unwrap_or_default();
175177
match source_config.source_params {
176178
SourceParams::File(FileSourceParams::Filepath(_))

quickwit/quickwit-control-plane/src/model/mod.rs

+30-1
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ impl ControlPlaneModel {
568568
#[cfg(test)]
569569
mod tests {
570570
use metastore::EmptyResponse;
571-
use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID};
571+
use quickwit_config::{SourceConfig, SourceParams, TransformConfig, INGEST_V2_SOURCE_ID};
572572
use quickwit_metastore::IndexMetadata;
573573
use quickwit_proto::ingest::{Shard, ShardState};
574574
use quickwit_proto::metastore::{ListIndexesMetadataResponse, MockMetastoreService};
@@ -772,6 +772,35 @@ mod tests {
772772
);
773773
}
774774

775+
#[test]
776+
fn test_control_plane_model_update_sources() {
777+
let mut model = ControlPlaneModel::default();
778+
let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///indexes");
779+
let mut my_source = SourceConfig::for_test("my-source", SourceParams::void());
780+
index_metadata.add_source(my_source.clone()).unwrap();
781+
index_metadata
782+
.add_source(SourceConfig::ingest_v2())
783+
.unwrap();
784+
let index_uid = index_metadata.index_uid.clone();
785+
model.add_index(index_metadata.clone());
786+
787+
// Update a source
788+
my_source.transform_config = Some(TransformConfig::new("del(.username)".to_string(), None));
789+
model.update_source(&index_uid, my_source.clone()).unwrap();
790+
791+
assert_eq!(model.index_table.len(), 1);
792+
assert_eq!(
793+
model
794+
.index_table
795+
.get(&index_uid)
796+
.unwrap()
797+
.sources
798+
.get("my-source")
799+
.unwrap(),
800+
&my_source
801+
);
802+
}
803+
775804
#[test]
776805
fn test_control_plane_model_delete_index() {
777806
let mut model = ControlPlaneModel::default();

quickwit/quickwit-indexing/src/actors/indexing_service.rs

+16-13
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use quickwit_common::io::Limiter;
3636
use quickwit_common::pubsub::EventBroker;
3737
use quickwit_common::{io, temp_dir};
3838
use quickwit_config::{
39-
build_doc_mapper, indexing_params_fingerprint, IndexConfig, IndexerConfig, SourceConfig,
40-
INGEST_API_SOURCE_ID,
39+
build_doc_mapper, indexing_pipeline_params_fingerprint, IndexConfig, IndexerConfig,
40+
SourceConfig, INGEST_API_SOURCE_ID,
4141
};
4242
use quickwit_ingest::{
4343
DropQueueRequest, GetPartitionId, IngestApiService, IngesterPool, ListQueuesRequest,
@@ -324,7 +324,8 @@ impl IndexingService {
324324
let max_concurrent_split_uploads_merge =
325325
(self.max_concurrent_split_uploads - max_concurrent_split_uploads_index).max(1);
326326

327-
let params_fingerprint = indexing_params_fingerprint(&index_config, &source_config);
327+
let params_fingerprint =
328+
indexing_pipeline_params_fingerprint(&index_config, &source_config);
328329
if let Some(expected_params_fingerprint) = expected_params_fingerprint {
329330
if params_fingerprint != expected_params_fingerprint {
330331
warn!(
@@ -1220,7 +1221,9 @@ mod tests {
12201221

12211222
#[tokio::test]
12221223
async fn test_indexing_service_apply_plan() {
1223-
const PARAMS_FINGERPRINT: u64 = 3865067856550546352;
1224+
const PARAMS_FINGERPRINT_INGEST_API: u64 = 1637744865450232394;
1225+
const PARAMS_FINGERPRINT_SOURCE_1: u64 = 1705211905504908791;
1226+
const PARAMS_FINGERPRINT_SOURCE_2: u64 = 8706667372658059428;
12241227

12251228
quickwit_common::setup_logging_for_tests();
12261229
let transport = ChannelTransport::default();
@@ -1281,14 +1284,14 @@ mod tests {
12811284
source_id: "test-indexing-service--source-1".to_string(),
12821285
shard_ids: Vec::new(),
12831286
pipeline_uid: Some(PipelineUid::for_test(0u128)),
1284-
params_fingerprint: PARAMS_FINGERPRINT,
1287+
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1,
12851288
},
12861289
IndexingTask {
12871290
index_uid: Some(metadata.index_uid.clone()),
12881291
source_id: "test-indexing-service--source-1".to_string(),
12891292
shard_ids: Vec::new(),
12901293
pipeline_uid: Some(PipelineUid::for_test(1u128)),
1291-
params_fingerprint: PARAMS_FINGERPRINT,
1294+
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1,
12921295
},
12931296
];
12941297
indexing_service
@@ -1327,28 +1330,28 @@ mod tests {
13271330
source_id: INGEST_API_SOURCE_ID.to_string(),
13281331
shard_ids: Vec::new(),
13291332
pipeline_uid: Some(PipelineUid::for_test(3u128)),
1330-
params_fingerprint: PARAMS_FINGERPRINT,
1333+
params_fingerprint: PARAMS_FINGERPRINT_INGEST_API,
13311334
},
13321335
IndexingTask {
13331336
index_uid: Some(metadata.index_uid.clone()),
13341337
source_id: "test-indexing-service--source-1".to_string(),
13351338
shard_ids: Vec::new(),
13361339
pipeline_uid: Some(PipelineUid::for_test(1u128)),
1337-
params_fingerprint: PARAMS_FINGERPRINT,
1340+
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1,
13381341
},
13391342
IndexingTask {
13401343
index_uid: Some(metadata.index_uid.clone()),
13411344
source_id: "test-indexing-service--source-1".to_string(),
13421345
shard_ids: Vec::new(),
13431346
pipeline_uid: Some(PipelineUid::for_test(2u128)),
1344-
params_fingerprint: PARAMS_FINGERPRINT,
1347+
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1,
13451348
},
13461349
IndexingTask {
13471350
index_uid: Some(metadata.index_uid.clone()),
13481351
source_id: source_config_2.source_id.clone(),
13491352
shard_ids: Vec::new(),
13501353
pipeline_uid: Some(PipelineUid::for_test(4u128)),
1351-
params_fingerprint: PARAMS_FINGERPRINT,
1354+
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_2,
13521355
},
13531356
];
13541357
indexing_service
@@ -1389,21 +1392,21 @@ mod tests {
13891392
source_id: INGEST_API_SOURCE_ID.to_string(),
13901393
shard_ids: Vec::new(),
13911394
pipeline_uid: Some(PipelineUid::for_test(3u128)),
1392-
params_fingerprint: PARAMS_FINGERPRINT,
1395+
params_fingerprint: PARAMS_FINGERPRINT_INGEST_API,
13931396
},
13941397
IndexingTask {
13951398
index_uid: Some(metadata.index_uid.clone()),
13961399
source_id: "test-indexing-service--source-1".to_string(),
13971400
shard_ids: Vec::new(),
13981401
pipeline_uid: Some(PipelineUid::for_test(1u128)),
1399-
params_fingerprint: PARAMS_FINGERPRINT,
1402+
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1,
14001403
},
14011404
IndexingTask {
14021405
index_uid: Some(metadata.index_uid.clone()),
14031406
source_id: source_config_2.source_id.clone(),
14041407
shard_ids: Vec::new(),
14051408
pipeline_uid: Some(PipelineUid::for_test(4u128)),
1406-
params_fingerprint: PARAMS_FINGERPRINT,
1409+
params_fingerprint: PARAMS_FINGERPRINT_SOURCE_2,
14071410
},
14081411
];
14091412
indexing_service

quickwit/quickwit-metastore/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub use metastore::{
5252
IndexMetadataResponseExt, IndexesMetadataResponseExt, ListIndexesMetadataResponseExt,
5353
ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt,
5454
MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt,
55-
UpdateIndexRequestExt,
55+
UpdateIndexRequestExt, UpdateSourceRequestExt,
5656
};
5757
pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore};
5858
pub use metastore_resolver::MetastoreResolver;

quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ impl IndexMetadata {
163163
if entry.get() == &source_config {
164164
return Ok(false);
165165
}
166-
entry.insert(source_config.clone());
166+
entry.insert(source_config);
167167
Ok(true)
168168
}
169169
Entry::Vacant(_) => Err(MetastoreError::NotFound(EntityKind::Source {

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1086,7 +1086,7 @@ impl MetastoreService for PostgresqlMetastore {
10861086
async fn update_source(&self, request: UpdateSourceRequest) -> MetastoreResult<EmptyResponse> {
10871087
let source_config = request.deserialize_source_config()?;
10881088
let index_uid: IndexUid = request.index_uid().clone();
1089-
run_with_tx!(self.connection_pool, tx, "add source", {
1089+
run_with_tx!(self.connection_pool, tx, "update source", {
10901090
mutate_index_metadata::<MetastoreError, _>(tx, index_uid, |index_metadata| {
10911091
let mutation_occurred = index_metadata.update_source(source_config)?;
10921092
Ok(MutationOccurred::from(mutation_occurred))

quickwit/quickwit-metastore/src/tests/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,13 @@ macro_rules! metastore_test_suite {
375375
$crate::tests::source::test_metastore_add_source::<$metastore_type>().await;
376376
}
377377

378+
#[tokio::test]
379+
#[serial_test::file_serial]
380+
async fn test_metastore_update_source() {
381+
let _ = tracing_subscriber::fmt::try_init();
382+
$crate::tests::source::test_metastore_update_source::<$metastore_type>().await;
383+
}
384+
378385
#[tokio::test]
379386
#[serial_test::file_serial]
380387
async fn test_metastore_toggle_source() {

0 commit comments

Comments
 (0)