diff --git a/sdk/cosmos/azure_data_cosmos/Cargo.toml b/sdk/cosmos/azure_data_cosmos/Cargo.toml index 129b81b51d..885d8b85d7 100644 --- a/sdk/cosmos/azure_data_cosmos/Cargo.toml +++ b/sdk/cosmos/azure_data_cosmos/Cargo.toml @@ -38,7 +38,7 @@ workspace = true [features] default = ["hmac_rust"] key_auth = [] # Enables support for key-based authentication (Primary Keys and Resource Tokens) -preview_query_engine = [] # Enables support for the PREVIEW external query engine +preview_query_engine = ["serde_json/raw_value"] # Enables support for the PREVIEW external query engine hmac_rust = ["azure_core/hmac_rust"] hmac_openssl = ["azure_core/hmac_openssl"] diff --git a/sdk/cosmos/azure_data_cosmos/src/query/engine.rs b/sdk/cosmos/azure_data_cosmos/src/query/engine.rs index 7398f252d3..8cfe60e7f1 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/engine.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/engine.rs @@ -20,7 +20,7 @@ pub struct PipelineResult { pub is_completed: bool, /// The items yielded by the pipeline. - pub items: Vec>, + pub items: Vec>, /// Additional requests that must be made before the pipeline can continue. pub requests: Vec, diff --git a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs index ca27ac8f25..14780e5dad 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/executor.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/executor.rs @@ -15,7 +15,7 @@ pub struct QueryExecutor { items_link: ResourceLink, context: Context<'static>, query_engine: QueryEngineRef, - base_request: Request, + base_request: Option, query: Query, pipeline: Option, @@ -37,15 +37,13 @@ impl QueryExecutor { ) -> azure_core::Result { let items_link = container_link.feed(ResourceType::Items); let context = options.method_options.context.into_owned(); - let base_request = - pipeline::create_base_query_request(http_pipeline.url(&items_link), &query)?; Ok(Self { http_pipeline, container_link, items_link, context, query_engine, - base_request, + base_request: None, query, pipeline: None, phantom: std::marker::PhantomData, @@ -69,8 +67,13 @@ impl QueryExecutor { /// An item to yield, or None if execution is complete. #[tracing::instrument(skip_all)] async fn step(&mut self) -> azure_core::Result>> { - let pipeline = match self.pipeline.as_mut() { - Some(pipeline) => pipeline, + let (pipeline, base_request) = match self.pipeline.as_mut() { + Some(pipeline) => ( + pipeline, + self.base_request + .as_ref() + .expect("base_request should be set when pipeline is set"), + ), None => { // Initialize the pipeline. let query_plan = get_query_plan( @@ -97,8 +100,16 @@ impl QueryExecutor { let pipeline = self.query_engine .create_pipeline(&self.query.text, &query_plan, &pkranges)?; + self.query.text = pipeline.query().into(); + self.base_request = Some(crate::pipeline::create_base_query_request( + self.http_pipeline.url(&self.items_link), + &self.query, + )?); self.pipeline = Some(pipeline); - self.pipeline.as_mut().expect("we just set it") + ( + self.pipeline.as_mut().expect("we just set it"), + self.base_request.as_ref().expect("we just set it"), + ) } }; @@ -113,7 +124,7 @@ impl QueryExecutor { let items = results .items .into_iter() - .map(|item| serde_json::from_slice::(&item)) + .map(|item| serde_json::from_str::(item.get())) .collect::, _>>()?; // TODO: Provide a continuation token. @@ -122,7 +133,7 @@ impl QueryExecutor { // No items, so make any requests we need to make and provide them to the pipeline. for request in results.requests { - let mut query_request = self.base_request.clone(); + let mut query_request = base_request.clone(); query_request.insert_header( constants::PARTITION_KEY_RANGE_ID, request.partition_key_range_id.clone(), diff --git a/sdk/cosmos/azure_data_cosmos/src/query/mod.rs b/sdk/cosmos/azure_data_cosmos/src/query/mod.rs index 9df774ce0b..fefaa9bd76 100644 --- a/sdk/cosmos/azure_data_cosmos/src/query/mod.rs +++ b/sdk/cosmos/azure_data_cosmos/src/query/mod.rs @@ -68,7 +68,7 @@ pub use engine::*; pub struct Query { /// The query text itself. #[serde(rename = "query")] - text: String, + pub(crate) text: String, /// A list of parameters used in the query and their associated value. #[serde(skip_serializing_if = "Vec::is_empty")] // Don't serialize an empty array. diff --git a/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs b/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs index 4730b7f8e3..c55e7f39d5 100644 --- a/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs +++ b/sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs @@ -6,6 +6,7 @@ use std::{collections::VecDeque, sync::Mutex}; use serde::{Deserialize, Serialize}; use azure_data_cosmos::query::{PipelineResult, QueryEngine, QueryPipeline}; +use serde_json::value::RawValue; #[derive(Deserialize)] #[serde(rename_all = "camelCase")] @@ -104,10 +105,10 @@ impl PartitionState { self.next_continuation = continuation; } - pub fn pop_item(&mut self) -> azure_core::Result>> { + pub fn pop_item(&mut self) -> azure_core::Result>> { match self.queue.pop_front() { Some(item) => { - let item = serde_json::to_vec(&item)?; + let item = serde_json::value::to_raw_value(&item)?; Ok(Some(item)) } None => Ok(None),