Skip to content

Commit ffd816b

Browse files
authored
Add update source API (#5636)
1 parent 9e206bc commit ffd816b

File tree

41 files changed

+3011
-1164
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+3011
-1164
lines changed

docs/operating/upgrades.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ No migration is done if `otel-traces-v0_7` already exists. If you want `service_
2323

2424
Quickwit 0.9 introduces a new ingestion service to to power the ingest and bulk APIs (v2). The new ingest is enabled and used by default, even though the legacy one (v1) remains enabled to finish indexing residual data in the legacy write ahead logs. Note that `ingest_api.max_queue_disk_usage` is enforced on both ingest versions separately, which means that the cumulated disk usage might be up to twice this limit.
2525

26-
The control plane should be upgraded first in order to enable the new ingest source (v2) on all existing indexes. Ingested data into previously existing indexes on upgraded indexer nodes will not be picked by the indexing pipelines until the control plane is upgraded.
26+
The control plane should be upgraded first in order to enable the new ingest source (v2) on all existing indexes. Ingested data into previously existing indexes on upgraded indexer nodes will not be picked by the indexing pipelines until the control plane is upgraded. Because the indexing plan is computed differently in 0.9, all pipelines will be restarted when upgrading the control plane. If possible, we recommend avoiding rolling upgrades for indexers. Instead, scale down the number of indexers to zero first, then upgrade the control plane and finally scale the upgraded indexers back up.

docs/reference/cli.md

+21
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,27 @@ quickwit source create
493493
|-----------------|-------------|
494494
| `--index` | ID of the target index |
495495
| `--source-config` | Path to source config file. Please, refer to the documentation for more details. |
496+
### source update
497+
498+
Update an existing source.
499+
`quickwit source update [args]`
500+
501+
*Synopsis*
502+
503+
```bash
504+
quickwit source update
505+
--index <index>
506+
--source <source>
507+
--source-config <source-config>
508+
```
509+
510+
*Options*
511+
512+
| Option | Description |
513+
|-----------------|-------------|
514+
| `--index` | ID of the target index |
515+
| `--source` | ID of the source |
516+
| `--source-config` | Path to source config file. Please, refer to the documentation for more details. |
496517
### source enable
497518

498519
Enables a source for an index.

docs/reference/rest-api.md

+49-1
Original file line numberDiff line numberDiff line change
@@ -606,10 +606,58 @@ Create source by posting a source config JSON payload.
606606
| `version** | `String` | Config format version, put your current Quickwit version. | _required_ |
607607
| `source_id` | `String` | Source ID. See ID [validation rules](../configuration/source-config.md). | _required_ |
608608
| `source_type` | `String` | Source type: `kafka`, `kinesis` or `pulsar`. | _required_ |
609-
| `num_pipelines` | `usize` | Number of running indexing pipelines per node for this source. | 1 |
609+
| `num_pipelines` | `usize` | Number of running indexing pipelines per node for this source. | `1` |
610+
| `transform` | `object` | A [VRL](https://vector.dev/docs/reference/vrl/) transformation applied to incoming documents, as defined in [source config docs](../configuration/source-config.md#transform-parameters). | `null` |
610611
| `params` | `object` | Source parameters as defined in [source config docs](../configuration/source-config.md). | _required_ |
611612

612613

614+
**Payload Example**
615+
616+
curl -XPOST http://localhost:7280/api/v1/indexes/my-index/sources --data @source_config.json -H "Content-Type: application/json"
617+
618+
```json title="source_config.json
619+
{
620+
"version": "0.8",
621+
"source_id": "kafka-source",
622+
"source_type": "kafka",
623+
"params": {
624+
"topic": "quickwit-fts-staging",
625+
"client_params": {
626+
"bootstrap.servers": "kafka-quickwit-server:9092"
627+
}
628+
}
629+
}
630+
```
631+
632+
#### Response
633+
634+
The response is the created source config, and the content type is `application/json; charset=UTF-8.`
635+
636+
### Update a source
637+
638+
```
639+
PUT api/v1/indexes/<index id>/sources/<source id>
640+
```
641+
642+
Update a source by posting a source config JSON payload.
643+
644+
#### PUT payload
645+
646+
| Variable | Type | Description | Default value |
647+
|-------------------|----------|----------------------------------------------------------------------------------------|---------------|
648+
| `version** | `String` | Config format version, put your current Quickwit version. | _required_ |
649+
| `source_id` | `String` | Source ID, must be the same source as in the request URL. | _required_ |
650+
| `source_type` | `String` | Source type: `kafka`, `kinesis` or `pulsar`. Cannot be updated. | _required_ |
651+
| `num_pipelines` | `usize` | Number of running indexing pipelines per node for this source. | `1` |
652+
| `transform` | `object` | A [VRL](https://vector.dev/docs/reference/vrl/) transformation applied to incoming documents, as defined in [source config docs](../configuration/source-config.md#transform-parameters). | `null` |
653+
| `params` | `object` | Source parameters as defined in [source config docs](../configuration/source-config.md). | _required_ |
654+
655+
:::warning
656+
657+
While updating `num_pipelines` and `transform` is generally safe and reversible, updating `params` has consequences specific to the source type and might have side effects such as loosing the source's checkpoints. Perform such updates with great care.
658+
659+
:::
660+
613661
**Payload Example**
614662

615663
curl -XPOST http://localhost:7280/api/v1/indexes/my-index/sources --data @source_config.json -H "Content-Type: application/json"

quickwit/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-cli/src/source.rs

+88
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,20 @@ pub fn build_source_command() -> Command {
4545
.required(true),
4646
])
4747
)
48+
.subcommand(
49+
Command::new("update")
50+
.about("Updates an existing source.")
51+
.args(&[
52+
arg!(--index <INDEX_ID> "ID of the target index")
53+
.display_order(1)
54+
.required(true),
55+
arg!(--source <SOURCE_ID> "ID of the source")
56+
.display_order(2)
57+
.required(true),
58+
arg!(--"source-config" <SOURCE_CONFIG> "Path to source config file. Please, refer to the documentation for more details.")
59+
.required(true),
60+
])
61+
)
4862
.subcommand(
4963
Command::new("enable")
5064
.about("Enables a source for an index.")
@@ -142,6 +156,14 @@ pub struct CreateSourceArgs {
142156
pub source_config_uri: Uri,
143157
}
144158

159+
#[derive(Debug, Eq, PartialEq)]
160+
pub struct UpdateSourceArgs {
161+
pub client_args: ClientArgs,
162+
pub index_id: IndexId,
163+
pub source_id: SourceId,
164+
pub source_config_uri: Uri,
165+
}
166+
145167
#[derive(Debug, Eq, PartialEq)]
146168
pub struct ToggleSourceArgs {
147169
pub client_args: ClientArgs,
@@ -182,6 +204,7 @@ pub struct ResetCheckpointArgs {
182204
#[derive(Debug, Eq, PartialEq)]
183205
pub enum SourceCliCommand {
184206
CreateSource(CreateSourceArgs),
207+
UpdateSource(UpdateSourceArgs),
185208
ToggleSource(ToggleSourceArgs),
186209
DeleteSource(DeleteSourceArgs),
187210
DescribeSource(DescribeSourceArgs),
@@ -193,6 +216,7 @@ impl SourceCliCommand {
193216
pub async fn execute(self) -> anyhow::Result<()> {
194217
match self {
195218
Self::CreateSource(args) => create_source_cli(args).await,
219+
Self::UpdateSource(args) => update_source_cli(args).await,
196220
Self::ToggleSource(args) => toggle_source_cli(args).await,
197221
Self::DeleteSource(args) => delete_source_cli(args).await,
198222
Self::DescribeSource(args) => describe_source_cli(args).await,
@@ -207,6 +231,7 @@ impl SourceCliCommand {
207231
.context("failed to parse source subcommand")?;
208232
match subcommand.as_str() {
209233
"create" => Self::parse_create_args(submatches).map(Self::CreateSource),
234+
"update" => Self::parse_update_args(submatches).map(Self::UpdateSource),
210235
"enable" => {
211236
Self::parse_toggle_source_args(&subcommand, submatches).map(Self::ToggleSource)
212237
}
@@ -239,6 +264,26 @@ impl SourceCliCommand {
239264
})
240265
}
241266

267+
fn parse_update_args(mut matches: ArgMatches) -> anyhow::Result<UpdateSourceArgs> {
268+
let client_args = ClientArgs::parse(&mut matches)?;
269+
let index_id = matches
270+
.remove_one::<String>("index")
271+
.expect("`index` should be a required arg.");
272+
let source_id = matches
273+
.remove_one::<String>("source")
274+
.expect("`source` should be a required arg.");
275+
let source_config_uri = matches
276+
.remove_one::<String>("source-config")
277+
.map(|uri_str| Uri::from_str(&uri_str))
278+
.expect("`source-config` should be a required arg.")?;
279+
Ok(UpdateSourceArgs {
280+
client_args,
281+
index_id,
282+
source_id,
283+
source_config_uri,
284+
})
285+
}
286+
242287
fn parse_toggle_source_args(
243288
subcommand: &str,
244289
mut matches: ArgMatches,
@@ -337,6 +382,23 @@ async fn create_source_cli(args: CreateSourceArgs) -> anyhow::Result<()> {
337382
Ok(())
338383
}
339384

385+
async fn update_source_cli(args: UpdateSourceArgs) -> anyhow::Result<()> {
386+
debug!(args=?args, "update-source");
387+
println!("❯ Updating source...");
388+
let storage_resolver = StorageResolver::unconfigured();
389+
let source_config_content = load_file(&storage_resolver, &args.source_config_uri).await?;
390+
let source_config_str: &str = std::str::from_utf8(&source_config_content)
391+
.with_context(|| format!("source config is not utf-8: {}", args.source_config_uri))?;
392+
let config_format = ConfigFormat::sniff_from_uri(&args.source_config_uri)?;
393+
let qw_client = args.client_args.client();
394+
qw_client
395+
.sources(&args.index_id)
396+
.update(&args.source_id, source_config_str, config_format)
397+
.await?;
398+
println!("{} Source successfully updated.", "✔".color(GREEN_COLOR));
399+
Ok(())
400+
}
401+
340402
async fn toggle_source_cli(args: ToggleSourceArgs) -> anyhow::Result<()> {
341403
debug!(args=?args, "toggle-source");
342404
println!("❯ Toggling source...");
@@ -599,6 +661,32 @@ mod tests {
599661
assert_eq!(command, expected_command);
600662
}
601663

664+
#[test]
665+
fn test_parse_update_source_args() {
666+
let app = build_cli().no_binary_name(true);
667+
let matches = app
668+
.try_get_matches_from(vec![
669+
"source",
670+
"update",
671+
"--index",
672+
"hdfs-logs",
673+
"--source",
674+
"kafka-foo",
675+
"--source-config",
676+
"/source-conf.yaml",
677+
])
678+
.unwrap();
679+
let command = CliCommand::parse_cli_args(matches).unwrap();
680+
let expected_command =
681+
CliCommand::Source(SourceCliCommand::UpdateSource(UpdateSourceArgs {
682+
client_args: ClientArgs::default(),
683+
index_id: "hdfs-logs".to_string(),
684+
source_id: "kafka-foo".to_string(),
685+
source_config_uri: Uri::from_str("file:///source-conf.yaml").unwrap(),
686+
}));
687+
assert_eq!(command, expected_command);
688+
}
689+
602690
#[test]
603691
fn test_parse_toggle_source_args() {
604692
{

quickwit/quickwit-config/resources/tests/source_config/invalid-void-source.json

-8
This file was deleted.

quickwit/quickwit-config/src/index_config/mod.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,27 @@ pub struct IndexConfig {
257257

258258
impl IndexConfig {
259259
/// Return a fingerprint of parameters relevant for indexers
260-
pub fn indexing_params_fingerprint(&self) -> u64 {
260+
///
261+
/// This should remain private to this crate to avoid confusion with the
262+
/// full indexing pipeline fingerprint that also includes the source's
263+
/// fingerprint.
264+
pub(crate) fn indexing_params_fingerprint(&self) -> u64 {
261265
let mut hasher = SipHasher::new();
262266
self.doc_mapping.doc_mapping_uid.hash(&mut hasher);
263267
self.indexing_settings.hash(&mut hasher);
264268
hasher.finish()
265269
}
266270

271+
/// Compares IndexConfig level fingerprints
272+
///
273+
/// This method is meant to enable IndexConfig level fingerprint comparison
274+
/// without taking the risk of mixing them up with pipeline level
275+
/// fingerprints (computed by
276+
/// [`crate::indexing_pipeline_params_fingerprint()`]).
277+
pub fn equals_fingerprint(&self, other: &Self) -> bool {
278+
self.indexing_params_fingerprint() == other.indexing_params_fingerprint()
279+
}
280+
267281
#[cfg(any(test, feature = "testsuite"))]
268282
pub fn for_test(index_id: &str, index_uri: &str) -> Self {
269283
let index_uri = Uri::from_str(index_uri).unwrap();

quickwit/quickwit-config/src/lib.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#![deny(clippy::disallowed_methods)]
1616

17+
use std::hash::Hasher;
1718
use std::str::FromStr;
1819

1920
use anyhow::{bail, ensure, Context};
@@ -50,13 +51,14 @@ pub use quickwit_doc_mapper::DocMapping;
5051
use serde::de::DeserializeOwned;
5152
use serde::Serialize;
5253
use serde_json::Value as JsonValue;
54+
use siphasher::sip::SipHasher;
5355
use source_config::FileSourceParamsForSerde;
5456
pub use source_config::{
55-
load_source_config_from_user_config, FileSourceMessageType, FileSourceNotification,
56-
FileSourceParams, FileSourceSqs, KafkaSourceParams, KinesisSourceParams, PubSubSourceParams,
57-
PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig, SourceInputFormat,
58-
SourceParams, TransformConfig, VecSourceParams, VoidSourceParams, CLI_SOURCE_ID,
59-
INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
57+
load_source_config_from_user_config, load_source_config_update, FileSourceMessageType,
58+
FileSourceNotification, FileSourceParams, FileSourceSqs, KafkaSourceParams,
59+
KinesisSourceParams, PubSubSourceParams, PulsarSourceAuth, PulsarSourceParams,
60+
RegionOrEndpoint, SourceConfig, SourceInputFormat, SourceParams, TransformConfig,
61+
VecSourceParams, VoidSourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
6062
};
6163
use tracing::warn;
6264

@@ -281,6 +283,18 @@ pub trait TestableForRegression: Serialize + DeserializeOwned {
281283
fn assert_equality(&self, other: &Self);
282284
}
283285

286+
/// Returns a fingerprint (a hash) of all the parameters that should force an
287+
/// indexing pipeline to restart upon index or source config updates.
288+
pub fn indexing_pipeline_params_fingerprint(
289+
index_config: &IndexConfig,
290+
source_config: &SourceConfig,
291+
) -> u64 {
292+
let mut hasher = SipHasher::new();
293+
hasher.write_u64(index_config.indexing_params_fingerprint());
294+
hasher.write_u64(source_config.indexing_params_fingerprint());
295+
hasher.finish()
296+
}
297+
284298
#[cfg(test)]
285299
mod tests {
286300
use super::validate_identifier;

0 commit comments

Comments
 (0)