Skip to content

Commit 43b778f

Browse files
committed
Add endpoint
1 parent 10ad1e3 commit 43b778f

File tree

7 files changed

+267
-22
lines changed

7 files changed

+267
-22
lines changed

quickwit/quickwit-config/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ use serde_json::Value as JsonValue;
5959
use siphasher::sip::SipHasher;
6060
use source_config::FileSourceParamsForSerde;
6161
pub use source_config::{
62-
load_source_config_from_user_config, FileSourceMessageType, FileSourceNotification,
63-
FileSourceParams, FileSourceSqs, KafkaSourceParams, KinesisSourceParams, PubSubSourceParams,
64-
PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig, SourceInputFormat,
65-
SourceParams, TransformConfig, VecSourceParams, VoidSourceParams, CLI_SOURCE_ID,
66-
INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
62+
load_source_config_from_user_config, load_source_config_update, FileSourceMessageType,
63+
FileSourceNotification, FileSourceParams, FileSourceSqs, KafkaSourceParams,
64+
KinesisSourceParams, PubSubSourceParams, PulsarSourceAuth, PulsarSourceParams,
65+
RegionOrEndpoint, SourceConfig, SourceInputFormat, SourceParams, TransformConfig,
66+
VecSourceParams, VoidSourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
6767
};
6868
use tracing::warn;
6969

quickwit/quickwit-config/src/source_config/mod.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ use regex::Regex;
3333
use serde::de::Error;
3434
use serde::{Deserialize, Deserializer, Serialize};
3535
use serde_json::Value as JsonValue;
36-
pub use serialize::load_source_config_from_user_config;
3736
// For backward compatibility.
3837
use serialize::VersionedSourceConfig;
38+
pub use serialize::{load_source_config_from_user_config, load_source_config_update};
3939
use siphasher::sip::SipHasher;
4040

4141
use crate::{disable_ingest_v1, enable_ingest_v2};
@@ -632,6 +632,7 @@ impl TransformConfig {
632632

633633
#[cfg(test)]
634634
mod tests {
635+
use std::num::NonZero;
635636
use std::str::FromStr;
636637

637638
use quickwit_common::uri::Uri;
@@ -1392,4 +1393,46 @@ mod tests {
13921393
.unwrap();
13931394
assert_eq!(source_config.input_format, SourceInputFormat::PlainText);
13941395
}
1396+
1397+
#[tokio::test]
1398+
async fn test_update_kafka_source_config() {
1399+
let source_config_filepath = get_source_config_filepath("kafka-source.json");
1400+
let file_content = std::fs::read(&source_config_filepath).unwrap();
1401+
let source_config_uri = Uri::from_str(&source_config_filepath).unwrap();
1402+
let config_format = ConfigFormat::sniff_from_uri(&source_config_uri).unwrap();
1403+
let mut existing_source_config =
1404+
load_source_config_from_user_config(config_format, &file_content).unwrap();
1405+
existing_source_config.num_pipelines = NonZero::new(4).unwrap();
1406+
let new_source_config =
1407+
load_source_config_update(config_format, &file_content, &existing_source_config)
1408+
.unwrap();
1409+
1410+
let expected_source_config = SourceConfig {
1411+
source_id: "hdfs-logs-kafka-source".to_string(),
1412+
num_pipelines: NonZeroUsize::new(2).unwrap(),
1413+
enabled: true,
1414+
source_params: SourceParams::Kafka(KafkaSourceParams {
1415+
topic: "cloudera-cluster-logs".to_string(),
1416+
client_log_level: None,
1417+
client_params: json! {{"bootstrap.servers": "localhost:9092"}},
1418+
enable_backfill_mode: false,
1419+
}),
1420+
transform_config: Some(TransformConfig {
1421+
vrl_script: ".message = downcase(string!(.message))".to_string(),
1422+
timezone: "local".to_string(),
1423+
}),
1424+
input_format: SourceInputFormat::Json,
1425+
};
1426+
assert_eq!(new_source_config, expected_source_config);
1427+
assert_eq!(new_source_config.num_pipelines.get(), 2);
1428+
1429+
// the source type cannot be updated
1430+
existing_source_config.source_params = SourceParams::Kinesis(KinesisSourceParams {
1431+
stream_name: "my-stream".to_string(),
1432+
region_or_endpoint: None,
1433+
enable_backfill_mode: false,
1434+
});
1435+
load_source_config_update(config_format, &file_content, &existing_source_config)
1436+
.unwrap_err();
1437+
}
13951438
}

quickwit/quickwit-config/src/source_config/serialize.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
use std::num::NonZeroUsize;
2121

22-
use anyhow::bail;
22+
use anyhow::{bail, ensure};
2323
use quickwit_proto::types::SourceId;
2424
use serde::{Deserialize, Serialize};
2525

@@ -64,6 +64,32 @@ pub fn load_source_config_from_user_config(
6464
source_config_for_serialization.validate_and_build()
6565
}
6666

67+
pub fn load_source_config_update(
68+
config_format: ConfigFormat,
69+
config_content: &[u8],
70+
current_source_config: &SourceConfig,
71+
) -> anyhow::Result<SourceConfig> {
72+
let versioned_source_config: VersionedSourceConfig = config_format.parse(config_content)?;
73+
let source_config_for_serialization: SourceConfigForSerialization =
74+
versioned_source_config.into();
75+
let new_source_config = source_config_for_serialization.validate_and_build()?;
76+
77+
ensure!(
78+
current_source_config.source_id == new_source_config.source_id,
79+
"existing `source_id` {} does not match updated `source_id` {}",
80+
current_source_config.source_id,
81+
new_source_config.source_id
82+
);
83+
84+
ensure!(
85+
current_source_config.source_type() == new_source_config.source_type(),
86+
"source type cannot be updated, current type: {}",
87+
current_source_config.source_type(),
88+
);
89+
90+
Ok(new_source_config)
91+
}
92+
6793
impl SourceConfigForSerialization {
6894
/// Checks the validity of the `SourceConfig` as a "deserializable source".
6995
///

quickwit/quickwit-index-management/src/index.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ use quickwit_indexing::check_source_connectivity;
3131
use quickwit_metastore::{
3232
AddSourceRequestExt, CreateIndexResponseExt, IndexMetadata, IndexMetadataResponseExt,
3333
ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt,
34-
MetastoreServiceStreamSplitsExt, SplitInfo, SplitMetadata, SplitState,
34+
MetastoreServiceStreamSplitsExt, SplitInfo, SplitMetadata, SplitState, UpdateSourceRequestExt,
3535
};
3636
use quickwit_proto::metastore::{
3737
serde_utils, AddSourceRequest, CreateIndexRequest, DeleteIndexRequest, EntityKind,
3838
IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest,
3939
MarkSplitsForDeletionRequest, MetastoreError, MetastoreService, MetastoreServiceClient,
40-
ResetSourceCheckpointRequest,
40+
ResetSourceCheckpointRequest, UpdateSourceRequest,
4141
};
4242
use quickwit_proto::types::{IndexUid, SplitId};
4343
use quickwit_proto::{ServiceError, ServiceErrorCode};
@@ -481,6 +481,40 @@ impl IndexService {
481481
Ok(source)
482482
}
483483

484+
/// Updates a source from an index identified by its UID.
485+
pub async fn update_source(
486+
&mut self,
487+
index_uid: IndexUid,
488+
source_config: SourceConfig,
489+
) -> Result<SourceConfig, IndexServiceError> {
490+
let source_id = source_config.source_id.clone();
491+
check_source_connectivity(&self.storage_resolver, &source_config)
492+
.await
493+
.map_err(IndexServiceError::InvalidConfig)?;
494+
let update_source_request =
495+
UpdateSourceRequest::try_from_source_config(index_uid.clone(), &source_config)?;
496+
self.metastore.update_source(update_source_request).await?;
497+
info!(
498+
"source `{}` successfully updated for index `{}`",
499+
source_id, index_uid.index_id,
500+
);
501+
let index_metadata_request = IndexMetadataRequest::for_index_id(index_uid.index_id);
502+
let source = self
503+
.metastore
504+
.index_metadata(index_metadata_request)
505+
.await?
506+
.deserialize_index_metadata()?
507+
.sources
508+
.get(&source_id)
509+
.ok_or_else(|| {
510+
IndexServiceError::Internal(
511+
"created source is not in index metadata, this should never happen".to_string(),
512+
)
513+
})?
514+
.clone();
515+
Ok(source)
516+
}
517+
484518
pub async fn get_source(
485519
&mut self,
486520
index_id: &str,

quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl MetastoreService for ControlPlaneMetastore {
111111
}
112112

113113
async fn toggle_source(&self, request: ToggleSourceRequest) -> MetastoreResult<EmptyResponse> {
114-
let response = self.control_plane.clone().toggle_source(request).await?;
114+
let response = self.control_plane.toggle_source(request).await?;
115115
Ok(response)
116116
}
117117

quickwit/quickwit-serve/src/index_api/rest_handler.rs

Lines changed: 91 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ use super::index_resource::{
3737
};
3838
use super::source_resource::{
3939
__path_create_source, __path_delete_source, __path_reset_source_checkpoint,
40-
__path_toggle_source, create_source_handler, delete_source_handler, get_source_handler,
41-
get_source_shards_handler, reset_source_checkpoint_handler, toggle_source_handler,
42-
ToggleSource,
40+
__path_toggle_source, __path_update_source, create_source_handler, delete_source_handler,
41+
get_source_handler, get_source_shards_handler, reset_source_checkpoint_handler,
42+
toggle_source_handler, update_source_handler, ToggleSource,
4343
};
4444
use super::split_resource::{
4545
__path_list_splits, __path_mark_splits_for_deletion, list_splits_handler,
@@ -62,6 +62,7 @@ use crate::simple_list::from_simple_list;
6262
describe_index,
6363
mark_splits_for_deletion,
6464
create_source,
65+
update_source,
6566
reset_source_checkpoint,
6667
toggle_source,
6768
delete_source,
@@ -107,6 +108,7 @@ pub fn index_management_handlers(
107108
.or(reset_source_checkpoint_handler(index_service.metastore()))
108109
.or(toggle_source_handler(index_service.metastore()))
109110
.or(create_source_handler(index_service.clone()))
111+
.or(update_source_handler(index_service.clone()))
110112
.or(get_source_handler(index_service.metastore()))
111113
.or(delete_source_handler(index_service.metastore()))
112114
.or(get_source_shards_handler(index_service.metastore()))
@@ -1109,6 +1111,92 @@ mod tests {
11091111
}
11101112
}
11111113

1114+
#[tokio::test]
1115+
async fn test_update_source() {
1116+
let metastore = metastore_for_test();
1117+
let index_service = IndexService::new(metastore.clone(), StorageResolver::unconfigured());
1118+
let mut node_config = NodeConfig::for_test();
1119+
node_config.default_index_root_uri = Uri::for_test("file:///default-index-root-uri");
1120+
let index_management_handler =
1121+
super::index_management_handlers(index_service, Arc::new(node_config));
1122+
let resp = warp::test::request()
1123+
.path("/indexes")
1124+
.method("POST")
1125+
.json(&true)
1126+
.body(r#"{"version": "0.7", "index_id": "hdfs-logs", "doc_mapping": {"field_mappings":[{"name": "timestamp", "type": "i64", "fast": true, "indexed": true}]}}"#)
1127+
.reply(&index_management_handler)
1128+
.await;
1129+
assert_eq!(resp.status(), 200);
1130+
let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
1131+
let expected_response_json = serde_json::json!({
1132+
"index_config": {
1133+
"index_id": "hdfs-logs",
1134+
"index_uri": "file:///default-index-root-uri/hdfs-logs",
1135+
}
1136+
});
1137+
assert_json_include!(actual: resp_json, expected: expected_response_json);
1138+
1139+
// Create source.
1140+
let source_config_body = r#"{"version": "0.7", "source_id": "vec-source", "source_type": "vec", "params": {"docs": [], "batch_num_docs": 10}}"#;
1141+
let resp = warp::test::request()
1142+
.path("/indexes/hdfs-logs/sources")
1143+
.method("POST")
1144+
.json(&true)
1145+
.body(source_config_body)
1146+
.reply(&index_management_handler)
1147+
.await;
1148+
assert_eq!(resp.status(), 200);
1149+
1150+
// Update the source.
1151+
let update_source_config_body = r#"{"version": "0.7", "source_id": "vec-source", "source_type": "vec", "params": {"docs": [], "batch_num_docs": 20}}"#;
1152+
let resp = warp::test::request()
1153+
.path("/indexes/hdfs-logs/sources/vec-source")
1154+
.method("PUT")
1155+
.json(&true)
1156+
.body(update_source_config_body)
1157+
.reply(&index_management_handler)
1158+
.await;
1159+
assert_eq!(resp.status(), 200);
1160+
// Check that the source has been updated.
1161+
let index_metadata = metastore
1162+
.index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string()))
1163+
.await
1164+
.unwrap()
1165+
.deserialize_index_metadata()
1166+
.unwrap();
1167+
assert!(index_metadata.sources.contains_key("vec-source"));
1168+
let source_config = index_metadata.sources.get("vec-source").unwrap();
1169+
assert_eq!(source_config.source_type(), SourceType::Vec);
1170+
assert_eq!(
1171+
source_config.source_params,
1172+
SourceParams::Vec(VecSourceParams {
1173+
docs: Vec::new(),
1174+
batch_num_docs: 20,
1175+
partition: "".to_string(),
1176+
})
1177+
);
1178+
1179+
// Update the source with a different source_id (forbidden)
1180+
let update_source_config_body = r#"{"version": "0.7", "source_id": "other-source-id", "source_type": "vec", "params": {"docs": [], "batch_num_docs": 20}}"#;
1181+
let resp = warp::test::request()
1182+
.path("/indexes/hdfs-logs/sources/vec-source")
1183+
.method("PUT")
1184+
.json(&true)
1185+
.body(update_source_config_body)
1186+
.reply(&index_management_handler)
1187+
.await;
1188+
assert_eq!(resp.status(), 400);
1189+
// Check that the source hasn't been updated.
1190+
let index_metadata = metastore
1191+
.index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string()))
1192+
.await
1193+
.unwrap()
1194+
.deserialize_index_metadata()
1195+
.unwrap();
1196+
assert!(index_metadata.sources.contains_key("vec-source"));
1197+
assert!(!index_metadata.sources.contains_key("other-source-id"));
1198+
}
1199+
11121200
#[tokio::test]
11131201
async fn test_delete_non_existing_source() {
11141202
let mut mock_metastore = MockMetastoreService::new();
@@ -1244,13 +1332,6 @@ mod tests {
12441332
let index_management_handler =
12451333
super::index_management_handlers(index_service, Arc::new(NodeConfig::for_test()))
12461334
.recover(recover_fn);
1247-
// Check server returns 405 if sources root path is used.
1248-
let resp = warp::test::request()
1249-
.path("/indexes/quickwit-demo-index/sources/source-to-toggle")
1250-
.method("PUT")
1251-
.reply(&index_management_handler)
1252-
.await;
1253-
assert_eq!(resp.status(), 405);
12541335
let resp = warp::test::request()
12551336
.path("/indexes/quickwit-demo-index/sources/source-to-toggle/toggle")
12561337
.method("PUT")

0 commit comments

Comments
 (0)