Skip to content

Commit 14e6abc

Browse files
committed
Fix rest handler update test
1 parent ea13a18 commit 14e6abc

File tree

5 files changed

+104
-54
lines changed

5 files changed

+104
-54
lines changed

quickwit/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-indexing/Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ tracing = { workspace = true }
4747
ulid = { workspace = true }
4848
utoipa = { workspace = true }
4949
vrl = { workspace = true, optional = true }
50+
warp = { workspace = true, optional = true }
5051

5152
quickwit-actors = { workspace = true }
5253
quickwit-aws = { workspace = true }
@@ -84,7 +85,8 @@ sqs = [
8485
"queue-sources",
8586
"quickwit-aws/sqs",
8687
]
87-
sqs-localstack-tests = []
88+
sqs-test-helpers = ["warp"]
89+
sqs-localstack-tests = ["sqs-test-helpers"]
8890
vendored-kafka = [
8991
"kafka",
9092
"libz-sys/static",

quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs

+27-1
Original file line numberDiff line numberDiff line change
@@ -261,10 +261,11 @@ pub(crate) async fn check_connectivity(queue_url: &str) -> anyhow::Result<()> {
261261
Ok(())
262262
}
263263

264-
#[cfg(feature = "sqs-localstack-tests")]
264+
#[cfg(feature = "sqs-test-helpers")]
265265
pub mod test_helpers {
266266
use aws_sdk_sqs::types::QueueAttributeName;
267267
use ulid::Ulid;
268+
use warp::Filter;
268269

269270
use super::*;
270271

@@ -316,6 +317,31 @@ pub mod test_helpers {
316317
.unwrap()
317318
.to_string()
318319
}
320+
321+
/// Runs a mock SQS GetQueueAttributes endpoint to enable creating SQS
322+
/// sources that pass the connectivity check
323+
///
324+
/// Rerturns the queue URL to use for the source and a guard for the
325+
/// temporary mock server
326+
pub fn start_mock_sqs_get_queue_attributes_endpoint() -> (String, oneshot::Sender<()>) {
327+
let hello = warp::path!().map(|| "{}");
328+
let (tx, rx) = oneshot::channel();
329+
let (addr, server) =
330+
warp::serve(hello).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async {
331+
rx.await.ok();
332+
});
333+
tokio::spawn(server);
334+
let queue_url = format!("http://{}:{}/", addr.ip(), addr.port());
335+
(queue_url, tx)
336+
}
337+
338+
#[tokio::test]
339+
async fn test_mock_sqs_get_queue_attributes_endpoint() {
340+
let (queue_url, _shutdown) = start_mock_sqs_get_queue_attributes_endpoint();
341+
check_connectivity(&queue_url).await.unwrap();
342+
drop(_shutdown);
343+
check_connectivity(&queue_url).await.unwrap_err();
344+
}
319345
}
320346

321347
#[cfg(test)]

quickwit/quickwit-serve/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,7 @@ pprof = [
9898
"dep:pprof"
9999
]
100100
testsuite = []
101+
sqs-for-tests = [
102+
"quickwit-indexing/sqs",
103+
"quickwit-indexing/sqs-test-helpers"
104+
]

quickwit/quickwit-serve/src/index_api/rest_handler.rs

+69-52
Original file line numberDiff line numberDiff line change
@@ -1111,9 +1111,13 @@ mod tests {
11111111
}
11121112
}
11131113

1114+
#[cfg(feature = "sqs-for-tests")]
11141115
#[tokio::test]
11151116
async fn test_update_source() {
1117+
use quickwit_indexing::source::sqs_queue::test_helpers::start_mock_sqs_get_queue_attributes_endpoint;
1118+
11161119
let metastore = metastore_for_test();
1120+
let (queue_url, _guard) = start_mock_sqs_get_queue_attributes_endpoint();
11171121
let index_service = IndexService::new(metastore.clone(), StorageResolver::unconfigured());
11181122
let mut node_config = NodeConfig::for_test();
11191123
node_config.default_index_root_uri = Uri::for_test("file:///default-index-root-uri");
@@ -1137,64 +1141,77 @@ mod tests {
11371141
assert_json_include!(actual: resp_json, expected: expected_response_json);
11381142

11391143
// Create source.
1140-
let source_config_body = r#"{"version": "0.7", "source_id": "vec-source", "source_type": "vec", "params": {"docs": [], "batch_num_docs": 10}}"#;
1144+
let source_config_body = serde_json::json!({
1145+
"version": "0.7",
1146+
"source_id": "sqs-source",
1147+
"source_type": "file",
1148+
"params": {"notifications": [{"type": "sqs", "queue_url": queue_url, "message_type": "s3_notification"}]},
1149+
});
11411150
let resp = warp::test::request()
11421151
.path("/indexes/hdfs-logs/sources")
11431152
.method("POST")
1144-
.json(&true)
1145-
.body(source_config_body)
1153+
.json(&source_config_body)
11461154
.reply(&index_management_handler)
11471155
.await;
1148-
assert_eq!(resp.status(), 200);
1149-
1150-
// Update the source.
1151-
let update_source_config_body = r#"{"version": "0.7", "source_id": "vec-source", "source_type": "vec", "params": {"docs": [], "batch_num_docs": 20}}"#;
1152-
let resp = warp::test::request()
1153-
.path("/indexes/hdfs-logs/sources/vec-source")
1154-
.method("PUT")
1155-
.json(&true)
1156-
.body(update_source_config_body)
1157-
.reply(&index_management_handler)
1158-
.await;
1159-
assert_eq!(resp.status(), 200);
1160-
// Check that the source has been updated.
1161-
let index_metadata = metastore
1162-
.index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string()))
1163-
.await
1164-
.unwrap()
1165-
.deserialize_index_metadata()
1166-
.unwrap();
1167-
assert!(index_metadata.sources.contains_key("vec-source"));
1168-
let source_config = index_metadata.sources.get("vec-source").unwrap();
1169-
assert_eq!(source_config.source_type(), SourceType::Vec);
1170-
assert_eq!(
1171-
source_config.source_params,
1172-
SourceParams::Vec(VecSourceParams {
1173-
docs: Vec::new(),
1174-
batch_num_docs: 20,
1175-
partition: "".to_string(),
1176-
})
1177-
);
1156+
let resp_body = std::str::from_utf8(resp.body()).unwrap();
1157+
assert_eq!(resp.status(), 200, "{resp_body}");
11781158

1179-
// Update the source with a different source_id (forbidden)
1180-
let update_source_config_body = r#"{"version": "0.7", "source_id": "other-source-id", "source_type": "vec", "params": {"docs": [], "batch_num_docs": 20}}"#;
1181-
let resp = warp::test::request()
1182-
.path("/indexes/hdfs-logs/sources/vec-source")
1183-
.method("PUT")
1184-
.json(&true)
1185-
.body(update_source_config_body)
1186-
.reply(&index_management_handler)
1187-
.await;
1188-
assert_eq!(resp.status(), 400);
1189-
// Check that the source hasn't been updated.
1190-
let index_metadata = metastore
1191-
.index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string()))
1192-
.await
1193-
.unwrap()
1194-
.deserialize_index_metadata()
1195-
.unwrap();
1196-
assert!(index_metadata.sources.contains_key("vec-source"));
1197-
assert!(!index_metadata.sources.contains_key("other-source-id"));
1159+
{
1160+
// Update the source.
1161+
let update_source_config_body = serde_json::json!({
1162+
"version": "0.7",
1163+
"source_id": "sqs-source",
1164+
"source_type": "file",
1165+
"params": {"notifications": [{"type": "sqs", "queue_url": queue_url, "message_type": "s3_notification"}]},
1166+
});
1167+
let resp = warp::test::request()
1168+
.path("/indexes/hdfs-logs/sources/sqs-source")
1169+
.method("PUT")
1170+
.json(&update_source_config_body)
1171+
.reply(&index_management_handler)
1172+
.await;
1173+
let resp_body = std::str::from_utf8(resp.body()).unwrap();
1174+
assert_eq!(resp.status(), 200, "{resp_body}");
1175+
// Check that the source has been updated.
1176+
let index_metadata = metastore
1177+
.index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string()))
1178+
.await
1179+
.unwrap()
1180+
.deserialize_index_metadata()
1181+
.unwrap();
1182+
let metastore_source_config = index_metadata.sources.get("sqs-source").unwrap();
1183+
assert_eq!(metastore_source_config.source_type(), SourceType::File);
1184+
assert_eq!(
1185+
metastore_source_config,
1186+
&serde_json::from_value(update_source_config_body).unwrap(),
1187+
);
1188+
}
1189+
{
1190+
// Update the source with a different source_id (forbidden)
1191+
let update_source_config_body = serde_json::json!({
1192+
"version": "0.7",
1193+
"source_id": "new-source-id",
1194+
"source_type": "file",
1195+
"params": {"notifications": [{"type": "sqs", "queue_url": queue_url, "message_type": "s3_notification"}]},
1196+
});
1197+
let resp = warp::test::request()
1198+
.path("/indexes/hdfs-logs/sources/sqs-source")
1199+
.method("PUT")
1200+
.json(&update_source_config_body)
1201+
.reply(&index_management_handler)
1202+
.await;
1203+
let resp_body = std::str::from_utf8(resp.body()).unwrap();
1204+
assert_eq!(resp.status(), 400, "{resp_body}");
1205+
// Check that the source hasn't been updated.
1206+
let index_metadata = metastore
1207+
.index_metadata(IndexMetadataRequest::for_index_id("hdfs-logs".to_string()))
1208+
.await
1209+
.unwrap()
1210+
.deserialize_index_metadata()
1211+
.unwrap();
1212+
assert!(index_metadata.sources.contains_key("sqs-source"));
1213+
assert!(!index_metadata.sources.contains_key("other-source-id"));
1214+
}
11981215
}
11991216

12001217
#[tokio::test]

0 commit comments

Comments
 (0)