Skip to content

Commit 10bdda8

Browse files
committed
Add high throughput integration test
1 parent 7bc495a commit 10bdda8

File tree

8 files changed

+235
-102
lines changed

8 files changed

+235
-102
lines changed

docs/internals/ingest-v2.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ Ingest V2 is the latest ingestion API that is designed to be more efficient and
44

55
## Architecture
66

7-
Just like ingest V1, the new ingest uses [`mrecordlog`](https://github.com/quickwit-oss/mrecordlog) to persist ingested documents that are waiting to be indexed. But unlike V1, which always persists the documents locally on the node that receives them, ingest V2 can dynamically distribute them into WAL units called _shards_. The assigned shard can be local or on another indexer. The control plane is in charge of distributing the shards to balance the indexing work as well as possible across all indexer nodes. The progress within each shard is not tracked as an index metadata checkpoint anymore but in a dedicated metastore `shards` table.
7+
Just like ingest V1, the new ingest uses [`mrecordlog`](https://github.com/quickwit-oss/mrecordlog) to persist ingested documents that are waiting to be indexed. But unlike V1, which always persists the documents locally on the node that receives them, ingest V2 can dynamically distribute them into WAL units called _shards_. Here are a few key behaviors of this new mechanism:
8+
- When an indexer receives a document for ingestion, the assigned shard can be local or on another indexer.
9+
- The control plane is in charge of distributing the shards to balance the indexing work as well as possible across all indexer nodes.
10+
- Each shard has a throughput limit (5MB). If the ingest rate on an index is becoming greater than the cumulated throughput of all its shards, the control plane schedules the creation of new shards. Note that when the cumulated throughput is exceeded on an index, the ingest API returns "too many requests" errors until the new shards are effectively created.
11+
- The progress within each shard is tracked in a dedicated metastore `shards` table (instead of the index metadata checkpoint like for other sources).
812

913
In the future, the shard based ingest will also be capable of writing a replica for each shard, thus ensuring a high durability of the documents that are waiting to be indexed (durability of the indexed documents is guarantied by the object store).
1014

@@ -33,3 +37,4 @@ See [full configuration example](https://github.com/quickwit-oss/quickwit/blob/m
3337
- `ingest_api.replication_factor`, not working yet
3438
- ingest V1 always writes to the WAL of the node receiving the request, V2 potentially forwards it to another node, dynamically assigned by the control plane to distribute the indexing work more evenly.
3539
- ingest V2 parses and validates input documents synchronously. Schema and JSON formatting errors are returned in the ingest response (for ingest V1 those errors were available in the server logs only).
40+
- ingest V2 returns transient 429 (too many requests) errors when the ingestion rate is too fast

docs/operating/upgrades.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
title: Version 0.7 upgrade
2+
title: Version upgrade
33
sidebar_position: 4
44
---
55

quickwit/quickwit-cli/src/index.rs

+7
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,13 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
10711071
println!("└ document: {}", failure.document);
10721072
}
10731073
}
1074+
if response.num_too_many_requests > 0 {
1075+
println!("Retried request counts:");
1076+
println!(
1077+
" 429 (too many requests) = {}",
1078+
response.num_too_many_requests
1079+
);
1080+
}
10741081
Ok(())
10751082
}
10761083

quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,12 @@ use quickwit_proto::jaeger::storage::v1::span_reader_plugin_client::SpanReaderPl
3434
use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_client::LogsServiceClient;
3535
use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_client::TraceServiceClient;
3636
use quickwit_proto::types::NodeId;
37-
use quickwit_rest_client::models::IngestSource;
37+
use quickwit_rest_client::models::{CumulatedIngestResponse, IngestSource};
3838
use quickwit_rest_client::rest_client::{
3939
CommitType, QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL,
4040
};
4141
use quickwit_serve::tcp_listener::for_tests::TestTcpListenerResolver;
42-
use quickwit_serve::{
43-
serve_quickwit, ListSplitsQueryParams, RestIngestResponse, SearchRequestQueryString,
44-
};
42+
use quickwit_serve::{serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString};
4543
use quickwit_storage::StorageResolver;
4644
use reqwest::Url;
4745
use serde_json::Value;
@@ -243,7 +241,7 @@ pub(crate) async fn ingest(
243241
index_id: &str,
244242
ingest_source: IngestSource,
245243
commit_type: CommitType,
246-
) -> anyhow::Result<RestIngestResponse> {
244+
) -> anyhow::Result<CumulatedIngestResponse> {
247245
let resp = client
248246
.ingest(index_id, ingest_source, None, None, commit_type)
249247
.await?;

quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs

+86-14
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::time::Duration;
15+
use std::time::{Duration, Instant};
1616

1717
use futures_util::FutureExt;
1818
use itertools::Itertools;
@@ -23,9 +23,9 @@ use quickwit_indexing::actors::INDEXING_DIR_NAME;
2323
use quickwit_metastore::SplitState;
2424
use quickwit_proto::ingest::ParseFailureReason;
2525
use quickwit_rest_client::error::{ApiError, Error};
26-
use quickwit_rest_client::models::IngestSource;
26+
use quickwit_rest_client::models::{CumulatedIngestResponse, IngestSource};
2727
use quickwit_rest_client::rest_client::CommitType;
28-
use quickwit_serve::{ListSplitsQueryParams, RestIngestResponse, RestParseFailure};
28+
use quickwit_serve::{ListSplitsQueryParams, RestParseFailure, SearchRequestQueryString};
2929
use serde_json::json;
3030

3131
use crate::ingest_json;
@@ -306,11 +306,11 @@ async fn test_ingest_v2_happy_path() {
306306
.unwrap();
307307
assert_eq!(
308308
ingest_resp,
309-
RestIngestResponse {
309+
CumulatedIngestResponse {
310310
num_docs_for_processing: 1,
311311
num_ingested_docs: Some(1),
312312
num_rejected_docs: Some(0),
313-
parse_failures: None,
313+
..Default::default()
314314
},
315315
);
316316

@@ -332,6 +332,77 @@ async fn test_ingest_v2_happy_path() {
332332
sandbox.shutdown().await.unwrap();
333333
}
334334

335+
#[tokio::test]
336+
async fn test_ingest_v2_high_throughput() {
337+
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
338+
let index_id = "test_high_throughput";
339+
let index_config = format!(
340+
r#"
341+
version: 0.8
342+
index_id: {index_id}
343+
doc_mapping:
344+
field_mappings:
345+
- name: body
346+
type: text
347+
indexing_settings:
348+
commit_timeout_secs: 1
349+
"#
350+
);
351+
sandbox
352+
.rest_client(QuickwitService::Indexer)
353+
.indexes()
354+
.create(index_config, ConfigFormat::Yaml, false)
355+
.await
356+
.unwrap();
357+
358+
let body_size = 20 * 1000 * 1000;
359+
let line = json!({"body": "my dummy repeated payload"}).to_string();
360+
let num_docs = body_size / line.len();
361+
let body = std::iter::repeat_n(&line, num_docs).join("\n");
362+
let ingest_resp = ingest(
363+
&sandbox.rest_client(QuickwitService::Indexer),
364+
index_id,
365+
IngestSource::Str(body),
366+
CommitType::Auto,
367+
)
368+
.await
369+
.unwrap();
370+
assert_eq!(ingest_resp.num_docs_for_processing, num_docs as u64);
371+
assert_eq!(ingest_resp.num_ingested_docs, Some(num_docs as u64));
372+
assert_eq!(ingest_resp.num_rejected_docs, Some(0));
373+
// num_too_many_requests might actually be > 0
374+
375+
let searcher_client = sandbox.rest_client(QuickwitService::Searcher);
376+
// wait for the docs to be indexed
377+
let start_time = Instant::now();
378+
loop {
379+
let res = searcher_client
380+
.search(
381+
index_id,
382+
SearchRequestQueryString {
383+
query: "*".to_string(),
384+
..Default::default()
385+
},
386+
)
387+
.await;
388+
if let Ok(success_resp) = res {
389+
if success_resp.num_hits == num_docs as u64 {
390+
break;
391+
}
392+
}
393+
if start_time.elapsed() > Duration::from_secs(20) {
394+
panic!(
395+
"didn't manage to index {} docs in {:?}",
396+
num_docs,
397+
start_time.elapsed()
398+
);
399+
}
400+
tokio::time::sleep(Duration::from_secs(1)).await;
401+
}
402+
403+
sandbox.shutdown().await.unwrap();
404+
}
405+
335406
#[tokio::test]
336407
async fn test_commit_force() {
337408
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
@@ -372,11 +443,11 @@ async fn test_commit_force() {
372443
.unwrap();
373444
assert_eq!(
374445
ingest_resp,
375-
RestIngestResponse {
446+
CumulatedIngestResponse {
376447
num_docs_for_processing: 1,
377448
num_ingested_docs: Some(1),
378449
num_rejected_docs: Some(0),
379-
parse_failures: None,
450+
..Default::default()
380451
},
381452
);
382453

@@ -452,20 +523,20 @@ async fn test_commit_wait_for() {
452523
let (ingest_resp_1, ingest_resp_2) = tokio::join!(ingest_1_fut, ingest_2_fut);
453524
assert_eq!(
454525
ingest_resp_1,
455-
RestIngestResponse {
526+
CumulatedIngestResponse {
456527
num_docs_for_processing: 1,
457528
num_ingested_docs: Some(1),
458529
num_rejected_docs: Some(0),
459-
parse_failures: None,
530+
..Default::default()
460531
},
461532
);
462533
assert_eq!(
463534
ingest_resp_2,
464-
RestIngestResponse {
535+
CumulatedIngestResponse {
465536
num_docs_for_processing: 1,
466537
num_ingested_docs: Some(1),
467538
num_rejected_docs: Some(0),
468-
parse_failures: None,
539+
..Default::default()
469540
},
470541
);
471542

@@ -523,11 +594,11 @@ async fn test_commit_auto() {
523594
.unwrap();
524595
assert_eq!(
525596
ingest_resp,
526-
RestIngestResponse {
597+
CumulatedIngestResponse {
527598
num_docs_for_processing: 1,
528599
num_ingested_docs: Some(1),
529600
num_rejected_docs: Some(0),
530-
parse_failures: None,
601+
..Default::default()
531602
},
532603
);
533604

@@ -577,7 +648,7 @@ async fn test_detailed_ingest_response() {
577648

578649
assert_eq!(
579650
ingest_resp,
580-
RestIngestResponse {
651+
CumulatedIngestResponse {
581652
num_docs_for_processing: 2,
582653
num_ingested_docs: Some(1),
583654
num_rejected_docs: Some(1),
@@ -586,6 +657,7 @@ async fn test_detailed_ingest_response() {
586657
message: "failed to parse JSON document".to_string(),
587658
reason: ParseFailureReason::InvalidJson,
588659
}]),
660+
..Default::default()
589661
},
590662
);
591663
sandbox.shutdown().await.unwrap();

quickwit/quickwit-rest-client/src/models.rs

+90
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::path::PathBuf;
1616
use std::time::Duration;
1717

18+
use quickwit_serve::{RestIngestResponse, RestParseFailure};
1819
use reqwest::StatusCode;
1920
use serde::de::DeserializeOwned;
2021
use serde::{Deserialize, Serialize};
@@ -94,6 +95,43 @@ pub enum IngestSource {
9495
Stdin,
9596
}
9697

98+
#[derive(Debug, PartialEq, Default)]
99+
pub struct CumulatedIngestResponse {
100+
pub num_docs_for_processing: u64,
101+
pub num_ingested_docs: Option<u64>,
102+
pub num_rejected_docs: Option<u64>,
103+
pub parse_failures: Option<Vec<RestParseFailure>>,
104+
pub num_too_many_requests: u64,
105+
}
106+
107+
impl CumulatedIngestResponse {
108+
/// Aggregates ingest counts and errors.
109+
pub fn merge(self, other: RestIngestResponse) -> Self {
110+
Self {
111+
num_docs_for_processing: self.num_docs_for_processing + other.num_docs_for_processing,
112+
num_ingested_docs: apply_op(self.num_ingested_docs, other.num_ingested_docs, |a, b| {
113+
a + b
114+
}),
115+
num_rejected_docs: apply_op(self.num_rejected_docs, other.num_rejected_docs, |a, b| {
116+
a + b
117+
}),
118+
parse_failures: apply_op(self.parse_failures, other.parse_failures, |a, b| {
119+
a.into_iter().chain(b).collect()
120+
}),
121+
num_too_many_requests: self.num_too_many_requests,
122+
}
123+
}
124+
}
125+
126+
fn apply_op<T>(a: Option<T>, b: Option<T>, f: impl Fn(T, T) -> T) -> Option<T> {
127+
match (a, b) {
128+
(Some(a), Some(b)) => Some(f(a, b)),
129+
(Some(a), None) => Some(a),
130+
(None, Some(b)) => Some(b),
131+
(None, None) => None,
132+
}
133+
}
134+
97135
/// A structure that represent a timeout. Unlike Duration it can also represent an infinite or no
98136
/// timeout value.
99137
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Debug)]
@@ -149,3 +187,55 @@ impl Timeout {
149187
}
150188
}
151189
}
190+
191+
#[cfg(test)]
192+
mod test {
193+
use quickwit_proto::ingest::ParseFailureReason;
194+
195+
use super::*;
196+
197+
#[test]
198+
fn test_merge_responses() {
199+
let mut merged_response = CumulatedIngestResponse::default();
200+
let response1 = RestIngestResponse {
201+
num_docs_for_processing: 10,
202+
num_ingested_docs: Some(5),
203+
num_rejected_docs: Some(2),
204+
parse_failures: Some(vec![RestParseFailure {
205+
message: "error1".to_string(),
206+
document: "doc1".to_string(),
207+
reason: ParseFailureReason::InvalidJson,
208+
}]),
209+
};
210+
let response2 = RestIngestResponse {
211+
num_docs_for_processing: 15,
212+
num_ingested_docs: Some(10),
213+
num_rejected_docs: Some(3),
214+
parse_failures: Some(vec![RestParseFailure {
215+
message: "error2".to_string(),
216+
document: "doc2".to_string(),
217+
reason: ParseFailureReason::InvalidJson,
218+
}]),
219+
};
220+
merged_response = merged_response.merge(response1);
221+
merged_response = merged_response.merge(response2);
222+
assert_eq!(merged_response.num_docs_for_processing, 25);
223+
assert_eq!(merged_response.num_ingested_docs.unwrap(), 15);
224+
assert_eq!(merged_response.num_rejected_docs.unwrap(), 5);
225+
assert_eq!(
226+
merged_response.parse_failures.unwrap(),
227+
vec![
228+
RestParseFailure {
229+
message: "error1".to_string(),
230+
document: "doc1".to_string(),
231+
reason: ParseFailureReason::InvalidJson,
232+
},
233+
RestParseFailure {
234+
message: "error2".to_string(),
235+
document: "doc2".to_string(),
236+
reason: ParseFailureReason::InvalidJson,
237+
}
238+
]
239+
);
240+
}
241+
}

0 commit comments

Comments
 (0)