Skip to content

Commit e237068

Browse files
authored
fix #2616 by using RawValue, and fix query rewriting
1 parent 5dfa923 commit e237068

File tree

5 files changed

+37
-11
lines changed

5 files changed

+37
-11
lines changed

sdk/cosmos/azure_data_cosmos/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ workspace = true
3838
[features]
3939
default = ["hmac_rust"]
4040
key_auth = [] # Enables support for key-based authentication (Primary Keys and Resource Tokens)
41-
preview_query_engine = [] # Enables support for the PREVIEW external query engine
41+
preview_query_engine = ["serde_json/raw_value"] # Enables support for the PREVIEW external query engine
4242
hmac_rust = ["azure_core/hmac_rust"]
4343
hmac_openssl = ["azure_core/hmac_openssl"]
4444

sdk/cosmos/azure_data_cosmos/src/query/engine.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub struct PipelineResult {
2020
pub is_completed: bool,
2121

2222
/// The items yielded by the pipeline.
23-
pub items: Vec<Vec<u8>>,
23+
pub items: Vec<Box<serde_json::value::RawValue>>,
2424

2525
/// Additional requests that must be made before the pipeline can continue.
2626
pub requests: Vec<QueryRequest>,

sdk/cosmos/azure_data_cosmos/src/query/executor.rs

+20-7
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ pub struct QueryExecutor<T: DeserializeOwned> {
1515
items_link: ResourceLink,
1616
context: Context<'static>,
1717
query_engine: QueryEngineRef,
18-
base_request: Request,
18+
base_request: Option<Request>,
1919
query: Query,
2020
pipeline: Option<OwnedQueryPipeline>,
2121

@@ -45,7 +45,7 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
4545
items_link,
4646
context,
4747
query_engine,
48-
base_request,
48+
base_request: None,
4949
query,
5050
pipeline: None,
5151
phantom: std::marker::PhantomData,
@@ -69,8 +69,13 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
6969
/// An item to yield, or None if execution is complete.
7070
#[tracing::instrument(skip_all)]
7171
async fn step(&mut self) -> azure_core::Result<Option<FeedPage<T>>> {
72-
let pipeline = match self.pipeline.as_mut() {
73-
Some(pipeline) => pipeline,
72+
let (pipeline, base_request) = match self.pipeline.as_mut() {
73+
Some(pipeline) => (
74+
pipeline,
75+
self.base_request
76+
.as_ref()
77+
.expect("base_request should be set when pipeline is set"),
78+
),
7479
None => {
7580
// Initialize the pipeline.
7681
let query_plan = get_query_plan(
@@ -97,8 +102,16 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
97102
let pipeline =
98103
self.query_engine
99104
.create_pipeline(&self.query.text, &query_plan, &pkranges)?;
105+
self.query.text = pipeline.query().into();
106+
self.base_request = Some(crate::pipeline::create_base_query_request(
107+
self.http_pipeline.url(&self.items_link),
108+
&self.query,
109+
)?);
100110
self.pipeline = Some(pipeline);
101-
self.pipeline.as_mut().expect("we just set it")
111+
(
112+
self.pipeline.as_mut().expect("we just set it"),
113+
self.base_request.as_ref().expect("we just set it"),
114+
)
102115
}
103116
};
104117

@@ -113,7 +126,7 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
113126
let items = results
114127
.items
115128
.into_iter()
116-
.map(|item| serde_json::from_slice::<T>(&item))
129+
.map(|item| serde_json::from_str::<T>(item.get()))
117130
.collect::<Result<Vec<_>, _>>()?;
118131

119132
// TODO: Provide a continuation token.
@@ -122,7 +135,7 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
122135

123136
// No items, so make any requests we need to make and provide them to the pipeline.
124137
for request in results.requests {
125-
let mut query_request = self.base_request.clone();
138+
let mut query_request = base_request.clone();
126139
query_request.insert_header(
127140
constants::PARTITION_KEY_RANGE_ID,
128141
request.partition_key_range_id.clone(),

sdk/cosmos/azure_data_cosmos/src/query/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub use engine::*;
6868
pub struct Query {
6969
/// The query text itself.
7070
#[serde(rename = "query")]
71-
text: String,
71+
pub(crate) text: String,
7272

7373
/// A list of parameters used in the query and their associated value.
7474
#[serde(skip_serializing_if = "Vec::is_empty")] // Don't serialize an empty array.

sdk/cosmos/azure_data_cosmos/tests/framework/query_engine.rs

+14-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{collections::VecDeque, sync::Mutex};
66
use serde::{Deserialize, Serialize};
77

88
use azure_data_cosmos::query::{PipelineResult, QueryEngine, QueryPipeline};
9+
use serde_json::value::RawValue;
910

1011
#[derive(Deserialize)]
1112
#[serde(rename_all = "camelCase")]
@@ -202,7 +203,19 @@ impl QueryPipeline for MockQueryPipeline {
202203
if let Some((index, _)) = state {
203204
// Add this item to the result.
204205
if let Some(item) = self.partitions[index].pop_item()? {
205-
items.push(item);
206+
let s = String::from_utf8(item).map_err(|_| {
207+
azure_core::Error::message(
208+
azure_core::error::ErrorKind::DataConversion,
209+
"item is not valid UTF-8",
210+
)
211+
})?;
212+
let raw_value = RawValue::from_string(s).map_err(|_| {
213+
azure_core::Error::message(
214+
azure_core::error::ErrorKind::DataConversion,
215+
"failed to create RawValue",
216+
)
217+
})?;
218+
items.push(raw_value);
206219
}
207220
} else {
208221
// All partitions are exhausted, or have no items to produce

0 commit comments

Comments
 (0)