Skip to content

Commit 39a8506

Browse files
committed
Add unit tests for metastore and control plane
Also include some comment and naming improvments.
1 parent d008f8e commit 39a8506

File tree

12 files changed

+238
-22
lines changed

12 files changed

+238
-22
lines changed

quickwit/quickwit-config/src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ pub trait TestableForRegression: Serialize + DeserializeOwned {
288288
fn assert_equality(&self, other: &Self);
289289
}
290290

291-
pub fn indexing_params_fingerprint(
291+
/// Return a fingerprint of all parameters that should trigger an indexing pipeline restart.
292+
pub fn indexing_pipeline_params_fingerprint(
292293
index_config: &IndexConfig,
293294
source_config: &SourceConfig,
294295
) -> u64 {

quickwit/quickwit-control-plane/src/control_plane.rs

+71-2
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,9 @@ mod tests {
11241124
use mockall::Sequence;
11251125
use quickwit_actors::{AskError, Observe, SupervisorMetrics};
11261126
use quickwit_cluster::ClusterChangeStreamFactoryForTest;
1127-
use quickwit_config::{IndexConfig, SourceParams, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID};
1127+
use quickwit_config::{
1128+
IndexConfig, SourceParams, TransformConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID,
1129+
};
11281130
use quickwit_indexing::IndexingService;
11291131
use quickwit_metastore::{
11301132
CreateIndexRequestExt, IndexMetadata, ListIndexesMetadataResponseExt,
@@ -1273,7 +1275,8 @@ mod tests {
12731275
assert_eq!(source_config.source_type(), SourceType::Void);
12741276
true
12751277
})
1276-
.returning(|_| Ok(EmptyResponse {}));
1278+
.return_once(|_| Ok(EmptyResponse {}));
1279+
// the list_indexes_metadata and list_shards calls are made when the control plane starts
12771280
mock_metastore
12781281
.expect_list_indexes_metadata()
12791282
.return_once(move |_| {
@@ -1312,6 +1315,72 @@ mod tests {
13121315
universe.assert_quit().await;
13131316
}
13141317

1318+
#[tokio::test]
1319+
async fn test_control_plane_update_source() {
1320+
let universe = Universe::with_accelerated_time();
1321+
let self_node_id: NodeId = "test-node".into();
1322+
let indexer_pool = IndexerPool::default();
1323+
let ingester_pool = IngesterPool::default();
1324+
1325+
let mut index_metadata = IndexMetadata::for_test("test-index", "ram://tata");
1326+
index_metadata
1327+
.add_source(SourceConfig::ingest_v2())
1328+
.unwrap();
1329+
1330+
let test_source_config = SourceConfig::for_test("test-source", SourceParams::void());
1331+
index_metadata.add_source(test_source_config).unwrap();
1332+
1333+
let mut mock_metastore = MockMetastoreService::new();
1334+
mock_metastore
1335+
.expect_update_source()
1336+
.withf(|update_source_request| {
1337+
let source_config: SourceConfig =
1338+
serde_json::from_str(&update_source_request.source_config_json).unwrap();
1339+
assert_eq!(source_config.source_id, "test-source");
1340+
assert_eq!(source_config.source_type(), SourceType::Void);
1341+
assert!(!source_config.transform_config.is_none());
1342+
true
1343+
})
1344+
.return_once(|_| Ok(EmptyResponse {}));
1345+
// the list_indexes_metadata and list_shards calls are made when the control plane starts
1346+
mock_metastore
1347+
.expect_list_indexes_metadata()
1348+
.return_once(move |_| {
1349+
Ok(ListIndexesMetadataResponse::for_test(vec![
1350+
index_metadata.clone()
1351+
]))
1352+
});
1353+
mock_metastore
1354+
.expect_list_shards()
1355+
.return_once(move |_| Ok(ListShardsResponse::default()));
1356+
1357+
let cluster_config = ClusterConfig::for_test();
1358+
let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default();
1359+
let (control_plane_mailbox, _control_plane_handle, _readiness_rx) = ControlPlane::spawn(
1360+
&universe,
1361+
cluster_config,
1362+
self_node_id,
1363+
cluster_change_stream_factory,
1364+
indexer_pool,
1365+
ingester_pool,
1366+
MetastoreServiceClient::from_mock(mock_metastore),
1367+
);
1368+
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
1369+
let mut updated_source_config = SourceConfig::for_test("test-source", SourceParams::void());
1370+
updated_source_config.transform_config =
1371+
Some(TransformConfig::new("del(.username)".to_string(), None));
1372+
let update_source_request = UpdateSourceRequest {
1373+
index_uid: Some(index_uid),
1374+
source_config_json: serde_json::to_string(&updated_source_config).unwrap(),
1375+
};
1376+
control_plane_mailbox
1377+
.ask_for_res(update_source_request)
1378+
.await
1379+
.unwrap();
1380+
1381+
universe.assert_quit().await;
1382+
}
1383+
13151384
#[tokio::test]
13161385
async fn test_control_plane_toggle_source() {
13171386
let universe = Universe::with_accelerated_time();

quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use fnv::{FnvHashMap, FnvHashSet};
3030
use itertools::Itertools;
3131
use once_cell::sync::OnceCell;
3232
use quickwit_common::pretty::PrettySample;
33-
use quickwit_config::{indexing_params_fingerprint, FileSourceParams, SourceParams};
33+
use quickwit_config::{indexing_pipeline_params_fingerprint, FileSourceParams, SourceParams};
3434
use quickwit_proto::indexing::{
3535
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
3636
PIPELINE_THROUGHPUT,
@@ -170,7 +170,7 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
170170
}
171171
let params_fingerprint = model
172172
.index_metadata(&source_uid.index_uid)
173-
.map(|index_meta| indexing_params_fingerprint(&index_meta.index_config, source_config))
173+
.map(|index_meta| indexing_pipeline_params_fingerprint(&index_meta.index_config, source_config))
174174
.unwrap_or_default();
175175
match source_config.source_params {
176176
SourceParams::File(FileSourceParams::Filepath(_))

quickwit/quickwit-control-plane/src/model/mod.rs

+30-1
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,7 @@ impl ControlPlaneModel {
568568
#[cfg(test)]
569569
mod tests {
570570
use metastore::EmptyResponse;
571-
use quickwit_config::{SourceConfig, SourceParams, INGEST_V2_SOURCE_ID};
571+
use quickwit_config::{SourceConfig, SourceParams, TransformConfig, INGEST_V2_SOURCE_ID};
572572
use quickwit_metastore::IndexMetadata;
573573
use quickwit_proto::ingest::{Shard, ShardState};
574574
use quickwit_proto::metastore::{ListIndexesMetadataResponse, MockMetastoreService};
@@ -772,6 +772,35 @@ mod tests {
772772
);
773773
}
774774

775+
#[test]
776+
fn test_control_plane_model_update_sources() {
777+
let mut model = ControlPlaneModel::default();
778+
let mut index_metadata = IndexMetadata::for_test("test-index", "ram:///indexes");
779+
let mut my_source = SourceConfig::for_test("my-source", SourceParams::void());
780+
index_metadata.add_source(my_source.clone()).unwrap();
781+
index_metadata
782+
.add_source(SourceConfig::ingest_v2())
783+
.unwrap();
784+
let index_uid = index_metadata.index_uid.clone();
785+
model.add_index(index_metadata.clone());
786+
787+
// Update a source
788+
my_source.transform_config = Some(TransformConfig::new("del(.username)".to_string(), None));
789+
model.update_source(&index_uid, my_source.clone()).unwrap();
790+
791+
assert_eq!(model.index_table.len(), 1);
792+
assert_eq!(
793+
model
794+
.index_table
795+
.get(&index_uid)
796+
.unwrap()
797+
.sources
798+
.get("my-source")
799+
.unwrap(),
800+
&my_source
801+
);
802+
}
803+
775804
#[test]
776805
fn test_control_plane_model_delete_index() {
777806
let mut model = ControlPlaneModel::default();

quickwit/quickwit-indexing/src/actors/indexing_service.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use quickwit_common::io::Limiter;
3636
use quickwit_common::pubsub::EventBroker;
3737
use quickwit_common::{io, temp_dir};
3838
use quickwit_config::{
39-
build_doc_mapper, indexing_params_fingerprint, IndexConfig, IndexerConfig, SourceConfig,
40-
INGEST_API_SOURCE_ID,
39+
build_doc_mapper, indexing_pipeline_params_fingerprint, IndexConfig, IndexerConfig,
40+
SourceConfig, INGEST_API_SOURCE_ID,
4141
};
4242
use quickwit_ingest::{
4343
DropQueueRequest, GetPartitionId, IngestApiService, IngesterPool, ListQueuesRequest,
@@ -324,7 +324,8 @@ impl IndexingService {
324324
let max_concurrent_split_uploads_merge =
325325
(self.max_concurrent_split_uploads - max_concurrent_split_uploads_index).max(1);
326326

327-
let params_fingerprint = indexing_params_fingerprint(&index_config, &source_config);
327+
let params_fingerprint =
328+
indexing_pipeline_params_fingerprint(&index_config, &source_config);
328329
if let Some(expected_params_fingerprint) = expected_params_fingerprint {
329330
if params_fingerprint != expected_params_fingerprint {
330331
warn!(

quickwit/quickwit-metastore/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub use metastore::{
5252
IndexMetadataResponseExt, IndexesMetadataResponseExt, ListIndexesMetadataResponseExt,
5353
ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt,
5454
MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt,
55-
UpdateIndexRequestExt,
55+
UpdateIndexRequestExt, UpdateSourceRequestExt,
5656
};
5757
pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore};
5858
pub use metastore_resolver::MetastoreResolver;

quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ impl IndexMetadata {
163163
if entry.get() == &source_config {
164164
return Ok(false);
165165
}
166-
entry.insert(source_config.clone());
166+
entry.insert(source_config);
167167
Ok(true)
168168
}
169169
Entry::Vacant(_) => Err(MetastoreError::NotFound(EntityKind::Source {

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1086,7 +1086,7 @@ impl MetastoreService for PostgresqlMetastore {
10861086
async fn update_source(&self, request: UpdateSourceRequest) -> MetastoreResult<EmptyResponse> {
10871087
let source_config = request.deserialize_source_config()?;
10881088
let index_uid: IndexUid = request.index_uid().clone();
1089-
run_with_tx!(self.connection_pool, tx, "add source", {
1089+
run_with_tx!(self.connection_pool, tx, "update source", {
10901090
mutate_index_metadata::<MetastoreError, _>(tx, index_uid, |index_metadata| {
10911091
let mutation_occurred = index_metadata.update_source(source_config)?;
10921092
Ok(MutationOccurred::from(mutation_occurred))

quickwit/quickwit-metastore/src/tests/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,13 @@ macro_rules! metastore_test_suite {
375375
$crate::tests::source::test_metastore_add_source::<$metastore_type>().await;
376376
}
377377

378+
#[tokio::test]
379+
#[serial_test::file_serial]
380+
async fn test_metastore_update_source() {
381+
let _ = tracing_subscriber::fmt::try_init();
382+
$crate::tests::source::test_metastore_update_source::<$metastore_type>().await;
383+
}
384+
378385
#[tokio::test]
379386
#[serial_test::file_serial]
380387
async fn test_metastore_toggle_source() {

quickwit/quickwit-metastore/src/tests/source.rs

+111-2
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@
2020
use std::num::NonZeroUsize;
2121

2222
use quickwit_common::rand::append_random_suffix;
23-
use quickwit_config::{IndexConfig, SourceConfig, SourceInputFormat, SourceParams};
23+
use quickwit_config::{
24+
IndexConfig, SourceConfig, SourceInputFormat, SourceParams, TransformConfig,
25+
};
2426
use quickwit_proto::metastore::{
2527
AddSourceRequest, CreateIndexRequest, DeleteSourceRequest, EntityKind, IndexMetadataRequest,
2628
MetastoreError, PublishSplitsRequest, ResetSourceCheckpointRequest, SourceType,
27-
StageSplitsRequest, ToggleSourceRequest,
29+
StageSplitsRequest, ToggleSourceRequest, UpdateSourceRequest,
2830
};
2931
use quickwit_proto::types::IndexUid;
3032

3133
use super::DefaultForTest;
3234
use crate::checkpoint::SourceCheckpoint;
35+
use crate::metastore::UpdateSourceRequestExt;
3336
use crate::tests::cleanup_index;
3437
use crate::{
3538
AddSourceRequestExt, CreateIndexRequestExt, IndexMetadataResponseExt, MetastoreServiceExt,
@@ -136,6 +139,112 @@ pub async fn test_metastore_add_source<MetastoreToTest: MetastoreServiceExt + De
136139
cleanup_index(&mut metastore, index_uid).await;
137140
}
138141

142+
pub async fn test_metastore_update_source<MetastoreToTest: MetastoreServiceExt + DefaultForTest>() {
143+
let mut metastore = MetastoreToTest::default_for_test().await;
144+
145+
let index_id = append_random_suffix("test-add-source");
146+
let index_uri = format!("ram:///indexes/{index_id}");
147+
let index_config = IndexConfig::for_test(&index_id, &index_uri);
148+
149+
let create_index_request = CreateIndexRequest::try_from_index_config(&index_config).unwrap();
150+
let index_uid: IndexUid = metastore
151+
.create_index(create_index_request)
152+
.await
153+
.unwrap()
154+
.index_uid()
155+
.clone();
156+
157+
let source_id = format!("{index_id}--source");
158+
159+
let mut source = SourceConfig {
160+
source_id: source_id.to_string(),
161+
num_pipelines: NonZeroUsize::new(1).unwrap(),
162+
enabled: true,
163+
source_params: SourceParams::void(),
164+
transform_config: None,
165+
input_format: SourceInputFormat::Json,
166+
};
167+
168+
assert_eq!(
169+
metastore
170+
.index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string()))
171+
.await
172+
.unwrap()
173+
.deserialize_index_metadata()
174+
.unwrap()
175+
.checkpoint
176+
.source_checkpoint(&source_id),
177+
None
178+
);
179+
180+
let add_source_request =
181+
AddSourceRequest::try_from_source_config(index_uid.clone(), &source).unwrap();
182+
metastore.add_source(add_source_request).await.unwrap();
183+
184+
source.transform_config = Some(TransformConfig::new("del(.username)".to_string(), None));
185+
186+
// Update the source twice with the same value to validate indempotency
187+
for _ in 0..2 {
188+
let update_source_request =
189+
UpdateSourceRequest::try_from_source_config(index_uid.clone(), &source).unwrap();
190+
metastore
191+
.update_source(update_source_request)
192+
.await
193+
.unwrap();
194+
195+
let index_metadata = metastore
196+
.index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string()))
197+
.await
198+
.unwrap()
199+
.deserialize_index_metadata()
200+
.unwrap();
201+
202+
let sources = &index_metadata.sources;
203+
assert_eq!(sources.len(), 1);
204+
assert!(sources.contains_key(&source_id));
205+
assert_eq!(sources.get(&source_id).unwrap().source_id, source_id);
206+
assert_eq!(
207+
sources.get(&source_id).unwrap().source_type(),
208+
SourceType::Void
209+
);
210+
assert_eq!(
211+
sources.get(&source_id).unwrap().transform_config,
212+
Some(TransformConfig::new("del(.username)".to_string(), None))
213+
);
214+
assert_eq!(
215+
index_metadata.checkpoint.source_checkpoint(&source_id),
216+
Some(&SourceCheckpoint::default())
217+
);
218+
}
219+
220+
source.source_id = "unknown-src-id".to_string();
221+
assert!(matches!(
222+
metastore
223+
.update_source(
224+
UpdateSourceRequest::try_from_source_config(index_uid.clone(), &source).unwrap()
225+
)
226+
.await
227+
.unwrap_err(),
228+
MetastoreError::NotFound(EntityKind::Source { .. })
229+
));
230+
source.source_id = source_id;
231+
assert!(matches!(
232+
metastore
233+
.add_source(
234+
AddSourceRequest::try_from_source_config(
235+
IndexUid::new_with_random_ulid("index-not-found"),
236+
&source
237+
)
238+
.unwrap()
239+
)
240+
.await
241+
.unwrap_err(),
242+
MetastoreError::NotFound(EntityKind::Index { .. })
243+
));
244+
245+
cleanup_index(&mut metastore, index_uid).await;
246+
}
247+
139248
pub async fn test_metastore_toggle_source<MetastoreToTest: MetastoreServiceExt + DefaultForTest>() {
140249
let mut metastore = MetastoreToTest::default_for_test().await;
141250

quickwit/quickwit-proto/protos/quickwit/metastore.proto

+2-2
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,10 @@ service MetastoreService {
130130
// Adds a source.
131131
rpc AddSource(AddSourceRequest) returns (EmptyResponse);
132132

133-
// Update a source.
133+
// Updates a source.
134134
rpc UpdateSource(UpdateSourceRequest) returns (EmptyResponse);
135135

136-
// Toggles source.
136+
// Toggles (turns on or off) source.
137137
rpc ToggleSource(ToggleSourceRequest) returns (EmptyResponse);
138138

139139
// Removes source.

0 commit comments

Comments
 (0)