Skip to content

Commit ae0e2cd

Browse files
authored
Cosmos: fix #2616 by using RawValue, and fix query rewriting (#2617)
* fix #2616 by using RawValue, and fix query rewriting * remove unused variable * replace 'expect' with 'unwrap'
1 parent 6f836fa commit ae0e2cd

File tree

5 files changed

+26
-14
lines changed

5 files changed

+26
-14
lines changed

sdk/cosmos/azure_data_cosmos/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ workspace = true
3838
[features]
3939
default = ["hmac_rust"]
4040
key_auth = [] # Enables support for key-based authentication (Primary Keys and Resource Tokens)
41-
preview_query_engine = [] # Enables support for the PREVIEW external query engine
41+
preview_query_engine = ["serde_json/raw_value"] # Enables support for the PREVIEW external query engine
4242
hmac_rust = ["azure_core/hmac_rust"]
4343
hmac_openssl = ["azure_core/hmac_openssl"]
4444

sdk/cosmos/azure_data_cosmos/src/query/engine.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub struct PipelineResult {
2020
pub is_completed: bool,
2121

2222
/// The items yielded by the pipeline.
23-
pub items: Vec<Vec<u8>>,
23+
pub items: Vec<Box<serde_json::value::RawValue>>,
2424

2525
/// Additional requests that must be made before the pipeline can continue.
2626
pub requests: Vec<QueryRequest>,

sdk/cosmos/azure_data_cosmos/src/query/executor.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub struct QueryExecutor<T: DeserializeOwned> {
1515
items_link: ResourceLink,
1616
context: Context<'static>,
1717
query_engine: QueryEngineRef,
18-
base_request: Request,
18+
base_request: Option<Request>,
1919
query: Query,
2020
pipeline: Option<OwnedQueryPipeline>,
2121

@@ -37,15 +37,13 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
3737
) -> azure_core::Result<Self> {
3838
let items_link = container_link.feed(ResourceType::Items);
3939
let context = options.method_options.context.into_owned();
40-
let base_request =
41-
pipeline::create_base_query_request(http_pipeline.url(&items_link), &query)?;
4240
Ok(Self {
4341
http_pipeline,
4442
container_link,
4543
items_link,
4644
context,
4745
query_engine,
48-
base_request,
46+
base_request: None,
4947
query,
5048
pipeline: None,
5149
phantom: std::marker::PhantomData,
@@ -69,8 +67,13 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
6967
/// An item to yield, or None if execution is complete.
7068
#[tracing::instrument(skip_all)]
7169
async fn step(&mut self) -> azure_core::Result<Option<FeedPage<T>>> {
72-
let pipeline = match self.pipeline.as_mut() {
73-
Some(pipeline) => pipeline,
70+
let (pipeline, base_request) = match self.pipeline.as_mut() {
71+
Some(pipeline) => (
72+
pipeline,
73+
self.base_request
74+
.as_ref()
75+
.expect("base_request should be set when pipeline is set"),
76+
),
7477
None => {
7578
// Initialize the pipeline.
7679
let query_plan = get_query_plan(
@@ -97,8 +100,16 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
97100
let pipeline =
98101
self.query_engine
99102
.create_pipeline(&self.query.text, &query_plan, &pkranges)?;
103+
self.query.text = pipeline.query().into();
104+
self.base_request = Some(crate::pipeline::create_base_query_request(
105+
self.http_pipeline.url(&self.items_link),
106+
&self.query,
107+
)?);
100108
self.pipeline = Some(pipeline);
101-
self.pipeline.as_mut().expect("we just set it")
109+
(
110+
self.pipeline.as_mut().unwrap(),
111+
self.base_request.as_ref().unwrap(),
112+
)
102113
}
103114
};
104115

@@ -113,7 +124,7 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
113124
let items = results
114125
.items
115126
.into_iter()
116-
.map(|item| serde_json::from_slice::<T>(&item))
127+
.map(|item| serde_json::from_str::<T>(item.get()))
117128
.collect::<Result<Vec<_>, _>>()?;
118129

119130
// TODO: Provide a continuation token.
@@ -122,7 +133,7 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
122133

123134
// No items, so make any requests we need to make and provide them to the pipeline.
124135
for request in results.requests {
125-
let mut query_request = self.base_request.clone();
136+
let mut query_request = base_request.clone();
126137
query_request.insert_header(
127138
constants::PARTITION_KEY_RANGE_ID,
128139
request.partition_key_range_id.clone(),

sdk/cosmos/azure_data_cosmos/src/query/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub use engine::*;
6868
pub struct Query {
6969
/// The query text itself.
7070
#[serde(rename = "query")]
71-
text: String,
71+
pub(crate) text: String,
7272

7373
/// A list of parameters used in the query and their associated value.
7474
#[serde(skip_serializing_if = "Vec::is_empty")] // Don't serialize an empty array.

sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{collections::VecDeque, sync::Mutex};
66
use serde::{Deserialize, Serialize};
77

88
use azure_data_cosmos::query::{PipelineResult, QueryEngine, QueryPipeline};
9+
use serde_json::value::RawValue;
910

1011
#[derive(Deserialize)]
1112
#[serde(rename_all = "camelCase")]
@@ -104,10 +105,10 @@ impl PartitionState {
104105
self.next_continuation = continuation;
105106
}
106107

107-
pub fn pop_item(&mut self) -> azure_core::Result<Option<Vec<u8>>> {
108+
pub fn pop_item(&mut self) -> azure_core::Result<Option<Box<RawValue>>> {
108109
match self.queue.pop_front() {
109110
Some(item) => {
110-
let item = serde_json::to_vec(&item)?;
111+
let item = serde_json::value::to_raw_value(&item)?;
111112
Ok(Some(item))
112113
}
113114
None => Ok(None),

0 commit comments

Comments
 (0)