From 2015fc5f73620f36dc3731236a6188c131adbf69 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Thu, 20 Mar 2025 13:23:19 +0530 Subject: [PATCH 01/19] feat: Qdrant storage Signed-off-by: Anush008 --- Cargo.toml | 1 + src/ops/storages/mod.rs | 1 + src/ops/storages/qdrant.rs | 193 +++++++++++++++++++++++++++++++++++++ 3 files changed, 195 insertions(+) create mode 100644 src/ops/storages/qdrant.rs diff --git a/Cargo.toml b/Cargo.toml index 83a4086..d433e6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,3 +87,4 @@ hyper-rustls = { version = "0.27.5" } yup-oauth2 = "12.1.0" rustls = { version = "0.23.25" } http-body-util = "0.1.3" +qdrant-client = "1.13.0" diff --git a/src/ops/storages/mod.rs b/src/ops/storages/mod.rs index 26e9103..803b0cf 100644 --- a/src/ops/storages/mod.rs +++ b/src/ops/storages/mod.rs @@ -1 +1,2 @@ pub mod postgres; +pub mod qdrant; diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs new file mode 100644 index 0000000..ede018a --- /dev/null +++ b/src/ops/storages/qdrant.rs @@ -0,0 +1,193 @@ +use std::sync::Arc; + +use crate::base::spec::*; +use crate::ops::sdk::*; +use crate::setup; +use crate::utils::db::ValidIdentifier; +use anyhow::Result; +use derivative::Derivative; +use futures::FutureExt; +use serde::Serialize; + +#[derive(Debug, Deserialize)] +pub struct Spec { + qdrant_url: Option, + collection_name: Option, +} +const BIND_LIMIT: usize = 65535; + +pub struct Executor { + collection_name: ValidIdentifier, + key_fields_schema: Vec, + value_fields_schema: Vec, +} + +impl Executor { + fn new( + collection_name: String, + key_fields_schema: Vec, + value_fields_schema: Vec, + ) -> Result { + let collection_name = ValidIdentifier::try_from(collection_name)?; + Ok(Self { + key_fields_schema, + value_fields_schema, + collection_name, + }) + } +} + +#[async_trait] +impl ExportTargetExecutor for Executor { + async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()> { + let num_parameters = self.key_fields_schema.len() + self.value_fields_schema.len(); + for _upsert_chunk in mutation.upserts.chunks(BIND_LIMIT / num_parameters) {} + + // TODO: Find a way to batch delete. + for _delete_key in mutation.delete_keys.iter() {} + + Ok(()) + } +} + +#[async_trait] +impl QueryTarget for Executor { + async fn search(&self, _query: VectorMatchQuery) -> Result { + Ok(QueryResults { + fields: vec![], + results: vec![], + }) + } +} + +#[derive(Default)] +pub struct Factory {} + + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct TableId { + database_url: Option, + collection_name: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SetupState {} + +#[derive(Derivative)] +#[derivative(Debug)] +pub struct SetupStatusCheck { + #[derivative(Debug = "ignore")] + table_id: TableId, + + desired_state: Option, +} + +impl SetupStatusCheck { + fn new(table_id: TableId, desired_state: Option) -> Self { + Self { + table_id, + desired_state, + } + } +} + +#[async_trait] +impl setup::ResourceSetupStatusCheck for SetupStatusCheck { + type Key = TableId; + type State = SetupState; + + fn describe_resource(&self) -> String { + format!("Qdrant table {}", "TABLE ID") + } + + fn key(&self) -> &Self::Key { + &self.table_id + } + + fn desired_state(&self) -> Option<&Self::State> { + self.desired_state.as_ref() + } + + fn describe_changes(&self) -> Vec { + vec![] + } + + fn change_type(&self) -> setup::SetupChangeType { + setup::SetupChangeType::NoChange + } + + async fn apply_change(&self) -> Result<()> { + Ok(()) + } +} + +impl StorageFactoryBase for Arc { + type Spec = Spec; + type SetupState = SetupState; + type Key = TableId; + + fn name(&self) -> &str { + "Qdrant" + } + + fn build( + self: Arc, + name: String, + target_id: i32, + spec: Spec, + key_fields_schema: Vec, + value_fields_schema: Vec, + storage_options: IndexOptions, + context: Arc, + ) -> Result<( + (TableId, SetupState), + ExecutorFuture<'static, (Arc, Option>)>, + )> { + let _ = storage_options; + let table_id = TableId { + database_url: spec.qdrant_url.clone(), + collection_name: spec.collection_name.unwrap_or_else(|| { + format!("{}__{}__{}", context.flow_instance_name, name, target_id) + }), + }; + let setup_state = SetupState {}; + let collection_name = table_id.collection_name.clone(); + let executors = async move { + let executor = Arc::new(Executor::new( + collection_name, + key_fields_schema, + value_fields_schema, + )?); + let query_target = executor.clone(); + Ok(( + executor as Arc, + Some(query_target as Arc), + )) + }; + Ok(((table_id, setup_state), executors.boxed())) + } + + fn check_setup_status( + &self, + key: TableId, + desired: Option, + existing: setup::CombinedState, + ) -> Result< + impl setup::ResourceSetupStatusCheck + 'static, + > { + let _ = existing; + Ok(SetupStatusCheck::new(key, desired)) + } + + fn will_keep_all_existing_data( + &self, + _name: &str, + _target_id: i32, + desired: &SetupState, + existing: &SetupState, + ) -> Result { + let _ = existing; + let _ = desired; + Ok(true) + } +} From d81497493028f13784bc7b8341277472813bf9e1 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Mon, 24 Mar 2025 13:05:57 +0530 Subject: [PATCH 02/19] feat: Qdrant storage support Signed-off-by: Anush008 --- docs/docs/ops/storages.md | 14 +- examples/pdf_embedding/main.py | 2 +- python/cocoindex/storages.py | 5 + src/ops/interface.rs | 1 + src/ops/registration.rs | 1 + src/ops/storages/qdrant.rs | 331 ++++++++++++++++++++++++++++----- 6 files changed, 305 insertions(+), 49 deletions(-) diff --git a/docs/docs/ops/storages.md b/docs/docs/ops/storages.md index 22614cc..2a9719a 100644 --- a/docs/docs/ops/storages.md +++ b/docs/docs/ops/storages.md @@ -11,6 +11,16 @@ description: CocoIndex Built-in Storages The spec takes the following fields: -* `database_url` (type: `str`, optional): The URL of the Postgres database to use as the internal storage, e.g. `postgres://cocoindex:cocoindex@localhost/cocoindex`. If unspecified, will use the same database as the [internal storage](/docs/core/basics#internal-storage). +* `database_url` (type: `str`, optional): The URL of the Postgres database to use as the internal storage, e.g. `postgres://cocoindex:cocoindex@localhost/cocoindex`. If unspecified, will use the same database as the [internal storage](/docs/core/basics#internal-storage). -* `table_name` (type: `str`, optional): The name of the table to store to. If unspecified, will generate a new automatically. We recommend specifying a name explicitly if you want to directly query the table. It can be omitted if you want to use CocoIndex's query handlers to query the table. +* `table_name` (type: `str`, optional): The name of the table to store to. If unspecified, will generate a new automatically. We recommend specifying a name explicitly if you want to directly query the table. It can be omitted if you want to use CocoIndex's query handlers to query the table. + +## Qdrant + +`Qdrant` exports data to a [Qdrant](https://qdrant.tech/) collection. + +The spec takes the following fields: + +* `qdrant_url` (type: `str`, required): The [gRPC URL](https://qdrant.tech/documentation/interfaces/#grpc-interface) of the Qdrant instance. Defaults to . + +* `collection` (type: `str`, required): The name of the collection to export the data to. diff --git a/examples/pdf_embedding/main.py b/examples/pdf_embedding/main.py index 0f7994b..4856100 100644 --- a/examples/pdf_embedding/main.py +++ b/examples/pdf_embedding/main.py @@ -60,7 +60,7 @@ def pdf_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoinde doc_embeddings.export( "doc_embeddings", - cocoindex.storages.Postgres(), + cocoindex.storages.Qdrant(qdrant_url="http://localhost:6333", collection_name="cocoindex"), primary_key_fields=["filename", "location"], vector_index=[("embedding", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)]) diff --git a/python/cocoindex/storages.py b/python/cocoindex/storages.py index bd4a4b2..851e4e5 100644 --- a/python/cocoindex/storages.py +++ b/python/cocoindex/storages.py @@ -6,3 +6,8 @@ class Postgres(op.StorageSpec): database_url: str | None = None table_name: str | None = None + +class Qdrant(op.StorageSpec): + """Storage powered by Qdrant - https://qdrant.tech/.""" + + collection_name: str diff --git a/src/ops/interface.rs b/src/ops/interface.rs index e040b6c..71cab9d 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -132,6 +132,7 @@ pub enum ExecutorFactory { ExportTarget(Arc), } +#[derive(Debug)] pub struct VectorMatchQuery { pub vector_field_name: String, pub vector: Vec, diff --git a/src/ops/registration.rs b/src/ops/registration.rs index 5d7678b..4f9c1ea 100644 --- a/src/ops/registration.rs +++ b/src/ops/registration.rs @@ -13,6 +13,7 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result functions::extract_by_llm::Factory.register(registry)?; Arc::new(storages::postgres::Factory::default()).register(registry)?; + Arc::new(storages::qdrant::Factory::default()).register(registry)?; Ok(()) } diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index ede018a..21ac8df 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -1,38 +1,66 @@ +use std::collections::HashMap; +use std::fmt::Display; use std::sync::Arc; use crate::base::spec::*; use crate::ops::sdk::*; use crate::setup; -use crate::utils::db::ValidIdentifier; -use anyhow::Result; +use anyhow::{bail, Result}; use derivative::Derivative; use futures::FutureExt; +use qdrant_client::qdrant::value::Kind; +use qdrant_client::qdrant::vectors_output::VectorsOptions; +use qdrant_client::qdrant::{NamedVectors, PointStruct, UpsertPointsBuilder, Value as QdrantValue}; +use qdrant_client::qdrant::{Query, QueryPointsBuilder, ScoredPoint}; +use qdrant_client::Qdrant; use serde::Serialize; -#[derive(Debug, Deserialize)] +fn key_value_fields_iter<'a>( + key_fields_schema: &[FieldSchema], + key_value: &'a KeyValue, +) -> Result<&'a [KeyValue]> { + let slice = if key_fields_schema.len() == 1 { + std::slice::from_ref(key_value) + } else { + match key_value { + KeyValue::Struct(fields) => fields, + _ => anyhow::bail!("expect struct key value"), + } + }; + Ok(slice) +} + +#[derive(Debug, Deserialize, Clone)] pub struct Spec { - qdrant_url: Option, - collection_name: Option, + collection_name: String, } -const BIND_LIMIT: usize = 65535; pub struct Executor { - collection_name: ValidIdentifier, + client: Qdrant, + collection_name: String, key_fields_schema: Vec, value_fields_schema: Vec, + all_fields: Vec, } impl Executor { fn new( - collection_name: String, + url: &str, + collection_name: &str, key_fields_schema: Vec, value_fields_schema: Vec, ) -> Result { - let collection_name = ValidIdentifier::try_from(collection_name)?; + let all_fields = key_fields_schema + .iter() + .chain(value_fields_schema.iter()) + .cloned() + .collect::>(); Ok(Self { + client: Qdrant::from_url(url).build()?, key_fields_schema, value_fields_schema, - collection_name, + all_fields, + collection_name: collection_name.to_string(), }) } } @@ -40,22 +68,233 @@ impl Executor { #[async_trait] impl ExportTargetExecutor for Executor { async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()> { - let num_parameters = self.key_fields_schema.len() + self.value_fields_schema.len(); - for _upsert_chunk in mutation.upserts.chunks(BIND_LIMIT / num_parameters) {} + let mut points: Vec = Vec::with_capacity(mutation.upserts.len()); + for upsert in mutation.upserts.iter() { + let key_fields = key_value_fields_iter(&self.key_fields_schema, &upsert.key)? + .iter() + .collect(); + let key_fields = parse_key_fields(&key_fields, &self.key_fields_schema)?; + let (mut payload, vectors) = + parse_value_fields(&upsert.value.fields, &self.value_fields_schema)?; + payload.extend(key_fields); - // TODO: Find a way to batch delete. - for _delete_key in mutation.delete_keys.iter() {} + points.push(PointStruct::new(1, vectors, payload)); + } + self.client + .upsert_points(UpsertPointsBuilder::new(&self.collection_name, points)) + .await?; Ok(()) } } +fn parse_key_fields( + key_fields: &Vec<&KeyValue>, + schema: &Vec, +) -> Result> { + let mut payload = HashMap::with_capacity(key_fields.len()); + + for (key_value, field_schema) in key_fields.iter().zip(schema.iter()) { + let value = match key_value { + KeyValue::Bytes(v) => QdrantValue { + kind: Some(Kind::StringValue(String::from_utf8_lossy(v).into_owned())), + }, + KeyValue::Str(v) => QdrantValue { + kind: Some(Kind::StringValue(v.clone().to_string())), + }, + KeyValue::Bool(v) => QdrantValue { + kind: Some(Kind::BoolValue(*v)), + }, + KeyValue::Int64(v) => QdrantValue { + kind: Some(Kind::IntegerValue(*v)), + }, + e => anyhow::bail!("Unsupported key value type {}", e), + }; + + payload.insert(field_schema.name.clone(), value); + } + + Ok(payload) +} + +fn parse_value_fields( + value_fields: &Vec, + schema: &Vec, +) -> Result<(HashMap, NamedVectors)> { + let mut payload = HashMap::with_capacity(value_fields.len()); + let mut vectors = NamedVectors::default(); + + for (value, field_schema) in value_fields.iter().zip(schema.iter()) { + let field_name = &field_schema.name; + match value { + Value::Basic(basic_value) => match basic_value { + BasicValue::Bytes(v) => insert_qdrant_value( + &mut payload, + field_name, + Kind::StringValue(String::from_utf8_lossy(v).into_owned()), + ), + BasicValue::Str(v) => insert_qdrant_value( + &mut payload, + field_name, + Kind::StringValue(v.clone().to_string()), + ), + BasicValue::Bool(v) => { + insert_qdrant_value(&mut payload, field_name, Kind::BoolValue(*v)) + } + BasicValue::Int64(v) => { + insert_qdrant_value(&mut payload, field_name, Kind::IntegerValue(*v)) + } + BasicValue::Float32(v) => { + insert_qdrant_value(&mut payload, field_name, Kind::DoubleValue(*v as f64)) + } + BasicValue::Float64(v) => { + insert_qdrant_value(&mut payload, field_name, Kind::DoubleValue(*v)) + } + BasicValue::Range(v) => insert_qdrant_value( + &mut payload, + field_name, + Kind::StringValue(format!("[{}, {})", v.start, v.end)), + ), + BasicValue::Vector(v) => { + let vector = convert_to_vector(v.to_vec()); + vectors = vectors.add_vector(field_name, vector); + } + _ => { + bail!("Unsupported BasicValue type in Value::Basic"); + } + }, + Value::Null => { + payload.insert(field_schema.name.clone(), QdrantValue { kind: None }); + } + _ => { + bail!("Unsupported Value variant: {:?}", value); + } + } + } + + Ok((payload, vectors)) +} + +fn insert_qdrant_value(payload: &mut HashMap, field_name: &str, kind: Kind) { + payload.insert(field_name.to_string(), QdrantValue { kind: Some(kind) }); +} + +fn convert_to_vector(v: Vec) -> Vec { + v.iter() + .filter_map(|elem| match elem { + BasicValue::Float32(f) => Some(*f), + BasicValue::Float64(f) => Some(*f as f32), + BasicValue::Int64(i) => Some(*i as f32), + _ => None, + }) + .collect() +} + +fn into_value(point: &ScoredPoint, schema: &FieldSchema) -> Result { + let field_name = &schema.name; + let typ = schema.value_type.typ.clone(); + let value = match typ { + ValueType::Basic(basic_type) => { + let basic_value = match basic_type { + BasicValueType::Str => point.payload.get(field_name).and_then(|v| { + v.as_str() + .map(|s| BasicValue::Str(Arc::from(s.to_string()))) + }), + BasicValueType::Bool => point + .payload + .get(field_name) + .and_then(|v| v.as_bool().map(BasicValue::Bool)), + + BasicValueType::Int64 => point + .payload + .get(field_name) + .and_then(|v| v.as_integer().map(BasicValue::Int64)), + + BasicValueType::Float32 => point + .payload + .get(field_name) + .and_then(|v| v.as_double().map(|f| BasicValue::Float32(f as f32))), + + BasicValueType::Float64 => point + .payload + .get(field_name) + .and_then(|v| v.as_double().map(BasicValue::Float64)), + + BasicValueType::Json => point + .payload + .get(field_name) + .map(|v| BasicValue::Json(Arc::from(v.clone().into_json()))), + + BasicValueType::Vector(_) => { + let vectors_options = point.vectors.clone().unwrap().vectors_options.unwrap(); + + match vectors_options { + VectorsOptions::Vector(vector) => { + let x = vector + .data + .into_iter() + .map(BasicValue::Float32) + .collect::>(); + Some(BasicValue::Vector(Arc::from(x))) + } + VectorsOptions::Vectors(vectors) => { + let vector = vectors.vectors[field_name].clone(); + let x = vector + .data + .into_iter() + .map(BasicValue::Float32) + .collect::>(); + Some(BasicValue::Vector(Arc::from(x))) + } + } + } + _ => { + anyhow::bail!("Unsupported value type") + } + }; + basic_value.map(Value::Basic) + } + _ => point + .payload + .get(field_name) + .map(|v| Value::from_json(v.clone().into_json(), &typ)) + .transpose()?, + }; + + let final_value = if let Some(v) = value { v } else { Value::Null }; + Ok(final_value) +} + #[async_trait] impl QueryTarget for Executor { - async fn search(&self, _query: VectorMatchQuery) -> Result { + async fn search(&self, query: VectorMatchQuery) -> Result { + let points = self + .client + .query( + QueryPointsBuilder::new(&self.collection_name) + .query(Query::new_nearest(query.vector)) + .limit(query.limit as u64) + .using(query.vector_field_name) + .with_payload(true), + ) + .await? + .result; + + let results = points + .iter() + .map(|point| { + let score = point.score as f64; + let data = self + .all_fields + .iter() + .map(|schema| into_value(point, schema)) + .collect::>>()?; + Ok(QueryResult { data, score }) + }) + .collect::>>()?; Ok(QueryResults { - fields: vec![], - results: vec![], + fields: self.all_fields.clone(), + results, }) } } @@ -63,13 +302,18 @@ impl QueryTarget for Executor { #[derive(Default)] pub struct Factory {} - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] -pub struct TableId { - database_url: Option, +pub struct CollectionId { collection_name: String, } +impl Display for CollectionId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.collection_name)?; + Ok(()) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SetupState {} @@ -77,13 +321,13 @@ pub struct SetupState {} #[derivative(Debug)] pub struct SetupStatusCheck { #[derivative(Debug = "ignore")] - table_id: TableId, + table_id: CollectionId, desired_state: Option, } impl SetupStatusCheck { - fn new(table_id: TableId, desired_state: Option) -> Self { + fn new(table_id: CollectionId, desired_state: Option) -> Self { Self { table_id, desired_state, @@ -93,11 +337,11 @@ impl SetupStatusCheck { #[async_trait] impl setup::ResourceSetupStatusCheck for SetupStatusCheck { - type Key = TableId; + type Key = CollectionId; type State = SetupState; fn describe_resource(&self) -> String { - format!("Qdrant table {}", "TABLE ID") + format!("Qdrant collection {}", self.table_id) } fn key(&self) -> &Self::Key { @@ -124,7 +368,7 @@ impl setup::ResourceSetupStatusCheck for SetupStatusCheck { impl StorageFactoryBase for Arc { type Spec = Spec; type SetupState = SetupState; - type Key = TableId; + type Key = CollectionId; fn name(&self) -> &str { "Qdrant" @@ -132,29 +376,27 @@ impl StorageFactoryBase for Arc { fn build( self: Arc, - name: String, - target_id: i32, + _name: String, + _target_id: i32, spec: Spec, key_fields_schema: Vec, value_fields_schema: Vec, - storage_options: IndexOptions, - context: Arc, + _storage_options: IndexOptions, + _context: Arc, ) -> Result<( - (TableId, SetupState), + (CollectionId, SetupState), ExecutorFuture<'static, (Arc, Option>)>, )> { - let _ = storage_options; - let table_id = TableId { - database_url: spec.qdrant_url.clone(), - collection_name: spec.collection_name.unwrap_or_else(|| { - format!("{}__{}__{}", context.flow_instance_name, name, target_id) - }), + let url = "http://localhost:6334/"; + let collection_name = spec.collection_name; + let table_id = CollectionId { + collection_name: collection_name.to_owned(), }; let setup_state = SetupState {}; - let collection_name = table_id.collection_name.clone(); let executors = async move { let executor = Arc::new(Executor::new( - collection_name, + &url, + &collection_name, key_fields_schema, value_fields_schema, )?); @@ -169,13 +411,12 @@ impl StorageFactoryBase for Arc { fn check_setup_status( &self, - key: TableId, + key: CollectionId, desired: Option, - existing: setup::CombinedState, + _existing: setup::CombinedState, ) -> Result< - impl setup::ResourceSetupStatusCheck + 'static, + impl setup::ResourceSetupStatusCheck + 'static, > { - let _ = existing; Ok(SetupStatusCheck::new(key, desired)) } @@ -183,11 +424,9 @@ impl StorageFactoryBase for Arc { &self, _name: &str, _target_id: i32, - desired: &SetupState, - existing: &SetupState, + _desired: &SetupState, + _existing: &SetupState, ) -> Result { - let _ = existing; - let _ = desired; Ok(true) } } From b0e4a4d8b6af9ee592126c4735d8599acd2f7609 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Wed, 26 Mar 2025 13:02:07 +0530 Subject: [PATCH 03/19] chore: review updates Signed-off-by: Anush008 --- docs/docs/ops/storages.md | 8 ++++---- src/ops/storages/qdrant.rs | 14 ++++++-------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/docs/docs/ops/storages.md b/docs/docs/ops/storages.md index 2a9719a..8c56548 100644 --- a/docs/docs/ops/storages.md +++ b/docs/docs/ops/storages.md @@ -11,9 +11,9 @@ description: CocoIndex Built-in Storages The spec takes the following fields: -* `database_url` (type: `str`, optional): The URL of the Postgres database to use as the internal storage, e.g. `postgres://cocoindex:cocoindex@localhost/cocoindex`. If unspecified, will use the same database as the [internal storage](/docs/core/basics#internal-storage). +* `database_url` (type: `str`, optional): The URL of the Postgres database to use as the internal storage, e.g. `postgres://cocoindex:cocoindex@localhost/cocoindex`. If unspecified, will use the same database as the [internal storage](/docs/core/basics#internal-storage). -* `table_name` (type: `str`, optional): The name of the table to store to. If unspecified, will generate a new automatically. We recommend specifying a name explicitly if you want to directly query the table. It can be omitted if you want to use CocoIndex's query handlers to query the table. +* `table_name` (type: `str`, optional): The name of the table to store to. If unspecified, will generate a new automatically. We recommend specifying a name explicitly if you want to directly query the table. It can be omitted if you want to use CocoIndex's query handlers to query the table. ## Qdrant @@ -21,6 +21,6 @@ The spec takes the following fields: The spec takes the following fields: -* `qdrant_url` (type: `str`, required): The [gRPC URL](https://qdrant.tech/documentation/interfaces/#grpc-interface) of the Qdrant instance. Defaults to . +* `qdrant_url` (type: `str`, required): The [gRPC URL](https://qdrant.tech/documentation/interfaces/#grpc-interface) of the Qdrant instance. Defaults to http://localhost:6334/. -* `collection` (type: `str`, required): The name of the collection to export the data to. +* `collection` (type: `str`, required): The name of the collection to export the data to. diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index 21ac8df..3ee63c1 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -70,12 +70,10 @@ impl ExportTargetExecutor for Executor { async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()> { let mut points: Vec = Vec::with_capacity(mutation.upserts.len()); for upsert in mutation.upserts.iter() { - let key_fields = key_value_fields_iter(&self.key_fields_schema, &upsert.key)? - .iter() - .collect(); - let key_fields = parse_key_fields(&key_fields, &self.key_fields_schema)?; + let key_fields = key_value_fields_iter(&self.key_fields_schema, &upsert.key)?; + let key_fields = key_values_to_payload(&key_fields, &self.key_fields_schema)?; let (mut payload, vectors) = - parse_value_fields(&upsert.value.fields, &self.value_fields_schema)?; + values_to_payload(&upsert.value.fields, &self.value_fields_schema)?; payload.extend(key_fields); points.push(PointStruct::new(1, vectors, payload)); @@ -88,8 +86,8 @@ impl ExportTargetExecutor for Executor { } } -fn parse_key_fields( - key_fields: &Vec<&KeyValue>, +fn key_values_to_payload( + key_fields: &[KeyValue], schema: &Vec, ) -> Result> { let mut payload = HashMap::with_capacity(key_fields.len()); @@ -117,7 +115,7 @@ fn parse_key_fields( Ok(payload) } -fn parse_value_fields( +fn values_to_payload( value_fields: &Vec, schema: &Vec, ) -> Result<(HashMap, NamedVectors)> { From 678f9fd7d3fff1cbe1251aa7510a687a5c7bcb2d Mon Sep 17 00:00:00 2001 From: Anush008 Date: Wed, 26 Mar 2025 13:15:37 +0530 Subject: [PATCH 04/19] chore: New API check_state_compatibility Signed-off-by: Anush008 --- examples/pdf_embedding/main.py | 2 +- src/ops/storages/qdrant.rs | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/examples/pdf_embedding/main.py b/examples/pdf_embedding/main.py index fa77d82..7cb1b0d 100644 --- a/examples/pdf_embedding/main.py +++ b/examples/pdf_embedding/main.py @@ -61,7 +61,7 @@ def pdf_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoinde doc_embeddings.export( "doc_embeddings", - cocoindex.storages.Qdrant(qdrant_url="http://localhost:6333", collection_name="cocoindex"), + cocoindex.storages.Qdrant(collection_name="cocoindex"), primary_key_fields=["id"], vector_index=[("embedding", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)]) diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index 3ee63c1..c106e23 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -375,7 +375,6 @@ impl StorageFactoryBase for Arc { fn build( self: Arc, _name: String, - _target_id: i32, spec: Spec, key_fields_schema: Vec, value_fields_schema: Vec, @@ -385,6 +384,7 @@ impl StorageFactoryBase for Arc { (CollectionId, SetupState), ExecutorFuture<'static, (Arc, Option>)>, )> { + // TODO(Anush008): Add as a field to the Spec let url = "http://localhost:6334/"; let collection_name = spec.collection_name; let table_id = CollectionId { @@ -418,13 +418,11 @@ impl StorageFactoryBase for Arc { Ok(SetupStatusCheck::new(key, desired)) } - fn will_keep_all_existing_data( + fn check_state_compatibility( &self, - _name: &str, - _target_id: i32, _desired: &SetupState, _existing: &SetupState, - ) -> Result { - Ok(true) + ) -> Result { + Ok(SetupStateCompatibility::Compatible) } } From f857b83c6bdb3da8a1841fb15d40e4925b845a78 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Thu, 27 Mar 2025 12:45:48 +0530 Subject: [PATCH 05/19] refactor: Simplify payload conversion Signed-off-by: Anush008 --- src/ops/storages/qdrant.rs | 100 +++++++++++++------------------------ 1 file changed, 36 insertions(+), 64 deletions(-) diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index c106e23..e83c07f 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -8,12 +8,14 @@ use crate::setup; use anyhow::{bail, Result}; use derivative::Derivative; use futures::FutureExt; -use qdrant_client::qdrant::value::Kind; use qdrant_client::qdrant::vectors_output::VectorsOptions; -use qdrant_client::qdrant::{NamedVectors, PointStruct, UpsertPointsBuilder, Value as QdrantValue}; +use qdrant_client::qdrant::{ + NamedVectors, PointStruct, UpsertPointsBuilder, Value as QdrantValue, +}; use qdrant_client::qdrant::{Query, QueryPointsBuilder, ScoredPoint}; use qdrant_client::Qdrant; use serde::Serialize; +use serde_json::json; fn key_value_fields_iter<'a>( key_fields_schema: &[FieldSchema], @@ -71,7 +73,7 @@ impl ExportTargetExecutor for Executor { let mut points: Vec = Vec::with_capacity(mutation.upserts.len()); for upsert in mutation.upserts.iter() { let key_fields = key_value_fields_iter(&self.key_fields_schema, &upsert.key)?; - let key_fields = key_values_to_payload(&key_fields, &self.key_fields_schema)?; + let key_fields = key_values_to_payload(key_fields, &self.key_fields_schema)?; let (mut payload, vectors) = values_to_payload(&upsert.value.fields, &self.value_fields_schema)?; payload.extend(key_fields); @@ -93,23 +95,16 @@ fn key_values_to_payload( let mut payload = HashMap::with_capacity(key_fields.len()); for (key_value, field_schema) in key_fields.iter().zip(schema.iter()) { - let value = match key_value { - KeyValue::Bytes(v) => QdrantValue { - kind: Some(Kind::StringValue(String::from_utf8_lossy(v).into_owned())), - }, - KeyValue::Str(v) => QdrantValue { - kind: Some(Kind::StringValue(v.clone().to_string())), - }, - KeyValue::Bool(v) => QdrantValue { - kind: Some(Kind::BoolValue(*v)), - }, - KeyValue::Int64(v) => QdrantValue { - kind: Some(Kind::IntegerValue(*v)), - }, - e => anyhow::bail!("Unsupported key value type {}", e), + let json_value = match key_value { + KeyValue::Bytes(v) => String::from_utf8_lossy(v).into(), + KeyValue::Str(v) => v.to_string().into(), + KeyValue::Bool(v) => (*v).into(), + KeyValue::Int64(v) => (*v).into(), + KeyValue::Uuid(v) => v.to_string().into(), + KeyValue::Range(v) => json!({ "start": v.start, "end": v.end }), + _ => bail!("Unsupported key value type"), }; - - payload.insert(field_schema.name.clone(), value); + payload.insert(field_schema.name.clone(), json_value.into()); } Ok(payload) @@ -124,59 +119,36 @@ fn values_to_payload( for (value, field_schema) in value_fields.iter().zip(schema.iter()) { let field_name = &field_schema.name; + match value { - Value::Basic(basic_value) => match basic_value { - BasicValue::Bytes(v) => insert_qdrant_value( - &mut payload, - field_name, - Kind::StringValue(String::from_utf8_lossy(v).into_owned()), - ), - BasicValue::Str(v) => insert_qdrant_value( - &mut payload, - field_name, - Kind::StringValue(v.clone().to_string()), - ), - BasicValue::Bool(v) => { - insert_qdrant_value(&mut payload, field_name, Kind::BoolValue(*v)) - } - BasicValue::Int64(v) => { - insert_qdrant_value(&mut payload, field_name, Kind::IntegerValue(*v)) - } - BasicValue::Float32(v) => { - insert_qdrant_value(&mut payload, field_name, Kind::DoubleValue(*v as f64)) - } - BasicValue::Float64(v) => { - insert_qdrant_value(&mut payload, field_name, Kind::DoubleValue(*v)) - } - BasicValue::Range(v) => insert_qdrant_value( - &mut payload, - field_name, - Kind::StringValue(format!("[{}, {})", v.start, v.end)), - ), - BasicValue::Vector(v) => { - let vector = convert_to_vector(v.to_vec()); - vectors = vectors.add_vector(field_name, vector); - } - _ => { - bail!("Unsupported BasicValue type in Value::Basic"); - } - }, - Value::Null => { - payload.insert(field_schema.name.clone(), QdrantValue { kind: None }); + Value::Basic(basic_value) => { + let json_value = match basic_value { + BasicValue::Bytes(v) => String::from_utf8_lossy(v).into(), + BasicValue::Str(v) => v.clone().to_string().into(), + BasicValue::Bool(v) => (*v).into(), + BasicValue::Int64(v) => (*v).into(), + BasicValue::Float32(v) => (*v as f64).into(), + BasicValue::Float64(v) => (*v).into(), + BasicValue::Range(v) => json!({ "start": v.start, "end": v.end }), + BasicValue::Vector(v) => { + let vector = convert_to_vector(v.to_vec()); + vectors = vectors.add_vector(field_name, vector); + continue; + } + _ => bail!("Unsupported BasicValue type in Value::Basic"), + }; + payload.insert(field_name.clone(), json_value.into()); } - _ => { - bail!("Unsupported Value variant: {:?}", value); + Value::Null => { + payload.insert(field_name.clone(), QdrantValue { kind: None }); } + _ => bail!("Unsupported Value variant: {:?}", value), } } Ok((payload, vectors)) } -fn insert_qdrant_value(payload: &mut HashMap, field_name: &str, kind: Kind) { - payload.insert(field_name.to_string(), QdrantValue { kind: Some(kind) }); -} - fn convert_to_vector(v: Vec) -> Vec { v.iter() .filter_map(|elem| match elem { @@ -393,7 +365,7 @@ impl StorageFactoryBase for Arc { let setup_state = SetupState {}; let executors = async move { let executor = Arc::new(Executor::new( - &url, + url, &collection_name, key_fields_schema, value_fields_schema, From 4a0bdf5b066945fdf1a5d0f9ccb8cc4bf1698e3c Mon Sep 17 00:00:00 2001 From: Anush008 Date: Tue, 8 Apr 2025 10:14:38 +0530 Subject: [PATCH 06/19] refactor: No ResourceSetupStatusCheck Signed-off-by: Anush008 --- src/ops/storages/qdrant.rs | 92 +++++++++----------------------------- 1 file changed, 20 insertions(+), 72 deletions(-) diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index e83c07f..2a09a74 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::convert::Infallible; use std::fmt::Display; use std::sync::Arc; @@ -6,12 +7,9 @@ use crate::base::spec::*; use crate::ops::sdk::*; use crate::setup; use anyhow::{bail, Result}; -use derivative::Derivative; use futures::FutureExt; use qdrant_client::qdrant::vectors_output::VectorsOptions; -use qdrant_client::qdrant::{ - NamedVectors, PointStruct, UpsertPointsBuilder, Value as QdrantValue, -}; +use qdrant_client::qdrant::{NamedVectors, PointStruct, UpsertPointsBuilder, Value as QdrantValue}; use qdrant_client::qdrant::{Query, QueryPointsBuilder, ScoredPoint}; use qdrant_client::Qdrant; use serde::Serialize; @@ -90,7 +88,7 @@ impl ExportTargetExecutor for Executor { fn key_values_to_payload( key_fields: &[KeyValue], - schema: &Vec, + schema: &[FieldSchema], ) -> Result> { let mut payload = HashMap::with_capacity(key_fields.len()); @@ -111,8 +109,8 @@ fn key_values_to_payload( } fn values_to_payload( - value_fields: &Vec, - schema: &Vec, + value_fields: &[Value], + schema: &[FieldSchema], ) -> Result<(HashMap, NamedVectors)> { let mut payload = HashMap::with_capacity(value_fields.len()); let mut vectors = NamedVectors::default(); @@ -287,58 +285,10 @@ impl Display for CollectionId { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SetupState {} -#[derive(Derivative)] -#[derivative(Debug)] -pub struct SetupStatusCheck { - #[derivative(Debug = "ignore")] - table_id: CollectionId, - - desired_state: Option, -} - -impl SetupStatusCheck { - fn new(table_id: CollectionId, desired_state: Option) -> Self { - Self { - table_id, - desired_state, - } - } -} - -#[async_trait] -impl setup::ResourceSetupStatusCheck for SetupStatusCheck { - type Key = CollectionId; - type State = SetupState; - - fn describe_resource(&self) -> String { - format!("Qdrant collection {}", self.table_id) - } - - fn key(&self) -> &Self::Key { - &self.table_id - } - - fn desired_state(&self) -> Option<&Self::State> { - self.desired_state.as_ref() - } - - fn describe_changes(&self) -> Vec { - vec![] - } - - fn change_type(&self) -> setup::SetupChangeType { - setup::SetupChangeType::NoChange - } - - async fn apply_change(&self) -> Result<()> { - Ok(()) - } -} - impl StorageFactoryBase for Arc { type Spec = Spec; type SetupState = SetupState; - type Key = CollectionId; + type Key = String; fn name(&self) -> &str { "Qdrant" @@ -352,21 +302,15 @@ impl StorageFactoryBase for Arc { value_fields_schema: Vec, _storage_options: IndexOptions, _context: Arc, - ) -> Result<( - (CollectionId, SetupState), - ExecutorFuture<'static, (Arc, Option>)>, - )> { + ) -> Result> { // TODO(Anush008): Add as a field to the Spec let url = "http://localhost:6334/"; - let collection_name = spec.collection_name; - let table_id = CollectionId { - collection_name: collection_name.to_owned(), - }; + let collection_name = spec.collection_name.clone(); let setup_state = SetupState {}; let executors = async move { let executor = Arc::new(Executor::new( url, - &collection_name, + &spec.collection_name.clone(), key_fields_schema, value_fields_schema, )?); @@ -376,18 +320,22 @@ impl StorageFactoryBase for Arc { Some(query_target as Arc), )) }; - Ok(((table_id, setup_state), executors.boxed())) + Ok(ExportTargetBuildOutput { + executor: executors.boxed(), + setup_key: collection_name, + desired_setup_state: setup_state, + }) } fn check_setup_status( &self, - key: CollectionId, - desired: Option, + _key: String, + _desired: Option, _existing: setup::CombinedState, - ) -> Result< - impl setup::ResourceSetupStatusCheck + 'static, - > { - Ok(SetupStatusCheck::new(key, desired)) + ) -> Result + 'static> { + Err(anyhow!( + "Set `setup_by_user` to `true` to use Qdrant storage" + )) as Result } fn check_state_compatibility( From a4c6cae1fed7a2774bded749eaa068a74d5cf9d5 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Tue, 8 Apr 2025 10:37:44 +0530 Subject: [PATCH 07/19] refactor: Replaced SetupState with () Signed-off-by: Anush008 --- src/ops/storages/qdrant.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index 2a09a74..ad859cd 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -282,12 +282,9 @@ impl Display for CollectionId { } } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SetupState {} - impl StorageFactoryBase for Arc { type Spec = Spec; - type SetupState = SetupState; + type SetupState = (); type Key = String; fn name(&self) -> &str { @@ -306,7 +303,6 @@ impl StorageFactoryBase for Arc { // TODO(Anush008): Add as a field to the Spec let url = "http://localhost:6334/"; let collection_name = spec.collection_name.clone(); - let setup_state = SetupState {}; let executors = async move { let executor = Arc::new(Executor::new( url, @@ -323,16 +319,16 @@ impl StorageFactoryBase for Arc { Ok(ExportTargetBuildOutput { executor: executors.boxed(), setup_key: collection_name, - desired_setup_state: setup_state, + desired_setup_state: (), }) } fn check_setup_status( &self, _key: String, - _desired: Option, - _existing: setup::CombinedState, - ) -> Result + 'static> { + _desired: Option<()>, + _existing: setup::CombinedState<()>, + ) -> Result + 'static> { Err(anyhow!( "Set `setup_by_user` to `true` to use Qdrant storage" )) as Result @@ -340,8 +336,8 @@ impl StorageFactoryBase for Arc { fn check_state_compatibility( &self, - _desired: &SetupState, - _existing: &SetupState, + _desired: &(), + _existing: &(), ) -> Result { Ok(SetupStateCompatibility::Compatible) } From 67f1e714daaeb5659a9b54de84c72be94af433c9 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Tue, 8 Apr 2025 11:57:42 +0530 Subject: [PATCH 08/19] feat: Parse point ID Signed-off-by: Anush008 --- src/ops/storages/qdrant.rs | 58 ++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index ad859cd..59e9e49 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -9,11 +9,14 @@ use crate::setup; use anyhow::{bail, Result}; use futures::FutureExt; use qdrant_client::qdrant::vectors_output::VectorsOptions; -use qdrant_client::qdrant::{NamedVectors, PointStruct, UpsertPointsBuilder, Value as QdrantValue}; +use qdrant_client::qdrant::{ + NamedVectors, PointId, PointStruct, UpsertPointsBuilder, Value as QdrantValue, +}; use qdrant_client::qdrant::{Query, QueryPointsBuilder, ScoredPoint}; use qdrant_client::Qdrant; use serde::Serialize; use serde_json::json; +use uuid::Uuid; fn key_value_fields_iter<'a>( key_fields_schema: &[FieldSchema], @@ -45,8 +48,8 @@ pub struct Executor { impl Executor { fn new( - url: &str, - collection_name: &str, + url: String, + collection_name: String, key_fields_schema: Vec, value_fields_schema: Vec, ) -> Result { @@ -56,11 +59,11 @@ impl Executor { .cloned() .collect::>(); Ok(Self { - client: Qdrant::from_url(url).build()?, + client: Qdrant::from_url(&url).build()?, key_fields_schema, value_fields_schema, all_fields, - collection_name: collection_name.to_string(), + collection_name, }) } } @@ -71,12 +74,11 @@ impl ExportTargetExecutor for Executor { let mut points: Vec = Vec::with_capacity(mutation.upserts.len()); for upsert in mutation.upserts.iter() { let key_fields = key_value_fields_iter(&self.key_fields_schema, &upsert.key)?; - let key_fields = key_values_to_payload(key_fields, &self.key_fields_schema)?; - let (mut payload, vectors) = + let point_id = key_to_point_id(key_fields)?; + let (payload, vectors) = values_to_payload(&upsert.value.fields, &self.value_fields_schema)?; - payload.extend(key_fields); - points.push(PointStruct::new(1, vectors, payload)); + points.push(PointStruct::new(point_id, vectors, payload)); } self.client @@ -85,27 +87,20 @@ impl ExportTargetExecutor for Executor { Ok(()) } } +fn key_to_point_id(key_values: &[KeyValue]) -> Result { + let point_id = if let Some(key_value) = key_values.first() { + match key_value { + KeyValue::Str(v) => PointId::from(v.to_string()), + KeyValue::Int64(v) => PointId::from(*v as u64), + KeyValue::Uuid(v) => PointId::from(v.to_string()), + _ => bail!("Unsupported Qdrant Point ID key type"), + } + } else { + let uuid = Uuid::new_v4().to_string(); + PointId::from(uuid) + }; -fn key_values_to_payload( - key_fields: &[KeyValue], - schema: &[FieldSchema], -) -> Result> { - let mut payload = HashMap::with_capacity(key_fields.len()); - - for (key_value, field_schema) in key_fields.iter().zip(schema.iter()) { - let json_value = match key_value { - KeyValue::Bytes(v) => String::from_utf8_lossy(v).into(), - KeyValue::Str(v) => v.to_string().into(), - KeyValue::Bool(v) => (*v).into(), - KeyValue::Int64(v) => (*v).into(), - KeyValue::Uuid(v) => v.to_string().into(), - KeyValue::Range(v) => json!({ "start": v.start, "end": v.end }), - _ => bail!("Unsupported key value type"), - }; - payload.insert(field_schema.name.clone(), json_value.into()); - } - - Ok(payload) + Ok(point_id) } fn values_to_payload( @@ -303,10 +298,11 @@ impl StorageFactoryBase for Arc { // TODO(Anush008): Add as a field to the Spec let url = "http://localhost:6334/"; let collection_name = spec.collection_name.clone(); + let executors = async move { let executor = Arc::new(Executor::new( - url, - &spec.collection_name.clone(), + url.to_string(), + spec.collection_name.clone(), key_fields_schema, value_fields_schema, )?); From d7f419b1e87a6d135d7996f31acd94a3d3a6c1f4 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Sun, 13 Apr 2025 12:36:41 +0530 Subject: [PATCH 09/19] chore: Support all Value::Basic values Signed-off-by: Anush008 --- src/ops/storages/qdrant.rs | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index 59e9e49..61e0a2c 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -82,7 +82,7 @@ impl ExportTargetExecutor for Executor { } self.client - .upsert_points(UpsertPointsBuilder::new(&self.collection_name, points)) + .upsert_points(UpsertPointsBuilder::new(&self.collection_name, points).wait(true)) .await?; Ok(()) } @@ -115,7 +115,7 @@ fn values_to_payload( match value { Value::Basic(basic_value) => { - let json_value = match basic_value { + let json_value: serde_json::Value = match basic_value { BasicValue::Bytes(v) => String::from_utf8_lossy(v).into(), BasicValue::Str(v) => v.clone().to_string().into(), BasicValue::Bool(v) => (*v).into(), @@ -123,12 +123,17 @@ fn values_to_payload( BasicValue::Float32(v) => (*v as f64).into(), BasicValue::Float64(v) => (*v).into(), BasicValue::Range(v) => json!({ "start": v.start, "end": v.end }), + BasicValue::Uuid(v) => v.to_string().into(), + BasicValue::Date(v) => v.to_string().into(), + BasicValue::LocalDateTime(v) => v.to_string().into(), + BasicValue::Time(v) => v.to_string().into(), + BasicValue::OffsetDateTime(v) => v.to_string().into(), + BasicValue::Json(v) => (**v).clone(), BasicValue::Vector(v) => { let vector = convert_to_vector(v.to_vec()); vectors = vectors.add_vector(field_name, vector); continue; } - _ => bail!("Unsupported BasicValue type in Value::Basic"), }; payload.insert(field_name.clone(), json_value.into()); } @@ -296,6 +301,14 @@ impl StorageFactoryBase for Arc { _context: Arc, ) -> Result> { // TODO(Anush008): Add as a field to the Spec + + if key_fields_schema.len() > 1 { + api_bail!( + "Expected only one primary key for the point ID. Got {}.", + key_fields_schema.len() + ) + } + let url = "http://localhost:6334/"; let collection_name = spec.collection_name.clone(); @@ -324,7 +337,8 @@ impl StorageFactoryBase for Arc { _key: String, _desired: Option<()>, _existing: setup::CombinedState<()>, - ) -> Result + 'static> { + _auth_registry: &Arc, + ) -> Result { Err(anyhow!( "Set `setup_by_user` to `true` to use Qdrant storage" )) as Result @@ -337,4 +351,8 @@ impl StorageFactoryBase for Arc { ) -> Result { Ok(SetupStateCompatibility::Compatible) } + + fn describe_resource(&self, key: &String) -> Result { + Ok(format!("Qdrant collection {}", key)) + } } From 379797c49529624f95b15bf9e8e221d75b281e77 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Sun, 13 Apr 2025 13:34:09 +0530 Subject: [PATCH 10/19] chore: Handle all BasicValue types in search(), doc updates Signed-off-by: Anush008 --- docs/docs/ops/storages.md | 6 ++-- python/cocoindex/storages.py | 1 + src/base/value.rs | 5 +-- src/ops/storages/qdrant.rs | 67 ++++++++++++++++++++++++------------ 4 files changed, 50 insertions(+), 29 deletions(-) diff --git a/docs/docs/ops/storages.md b/docs/docs/ops/storages.md index 8c56548..0807128 100644 --- a/docs/docs/ops/storages.md +++ b/docs/docs/ops/storages.md @@ -7,7 +7,7 @@ description: CocoIndex Built-in Storages ## Postgres -`Postgres` exports data to Postgres database (with pgvector extension). +Exports data to Postgres database (with pgvector extension). The spec takes the following fields: @@ -17,10 +17,10 @@ The spec takes the following fields: ## Qdrant -`Qdrant` exports data to a [Qdrant](https://qdrant.tech/) collection. +Exports data to a [Qdrant](https://qdrant.tech/) collection. The spec takes the following fields: -* `qdrant_url` (type: `str`, required): The [gRPC URL](https://qdrant.tech/documentation/interfaces/#grpc-interface) of the Qdrant instance. Defaults to http://localhost:6334/. +* `qdrant_url` (type: `str`, required): The [gRPC URL](https://qdrant.tech/documentation/interfaces/#grpc-interface) of the Qdrant instance. Defaults to `http://localhost:6334/`. * `collection` (type: `str`, required): The name of the collection to export the data to. diff --git a/python/cocoindex/storages.py b/python/cocoindex/storages.py index 67be942..1a4549f 100644 --- a/python/cocoindex/storages.py +++ b/python/cocoindex/storages.py @@ -16,6 +16,7 @@ class Qdrant(op.StorageSpec): """Storage powered by Qdrant - https://qdrant.tech/.""" collection_name: str + grpc_url: str = "http://localhost:6334/" @dataclass class Neo4jConnectionSpec: diff --git a/src/base/value.rs b/src/base/value.rs index 5a754f8..122151c 100644 --- a/src/base/value.rs +++ b/src/base/value.rs @@ -174,10 +174,7 @@ impl std::fmt::Display for KeyValue { } impl KeyValue { - pub fn fields_iter<'a>( - &'a self, - num_fields: usize, - ) -> Result> { + pub fn fields_iter(&self, num_fields: usize) -> Result> { let slice = if num_fields == 1 { std::slice::from_ref(self) } else { diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index 61e0a2c..db427a2 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -36,6 +36,7 @@ fn key_value_fields_iter<'a>( #[derive(Debug, Deserialize, Clone)] pub struct Spec { collection_name: String, + grpc_url: String, } pub struct Executor { @@ -193,29 +194,53 @@ fn into_value(point: &ScoredPoint, schema: &FieldSchema) -> Result { .get(field_name) .map(|v| BasicValue::Json(Arc::from(v.clone().into_json()))), - BasicValueType::Vector(_) => { - let vectors_options = point.vectors.clone().unwrap().vectors_options.unwrap(); - - match vectors_options { + BasicValueType::Vector(_) => point + .vectors + .as_ref() + .and_then(|v| v.vectors_options.as_ref()) + .and_then(|vectors_options| match vectors_options { VectorsOptions::Vector(vector) => { - let x = vector + let values = vector .data - .into_iter() - .map(BasicValue::Float32) + .iter() + .map(|f| BasicValue::Float32(*f)) .collect::>(); - Some(BasicValue::Vector(Arc::from(x))) + Some(BasicValue::Vector(Arc::from(values))) } VectorsOptions::Vectors(vectors) => { - let vector = vectors.vectors[field_name].clone(); - let x = vector - .data - .into_iter() - .map(BasicValue::Float32) - .collect::>(); - Some(BasicValue::Vector(Arc::from(x))) + vectors.vectors.get(field_name).map(|vector| { + let values = vector + .data + .iter() + .map(|f| BasicValue::Float32(*f)) + .collect::>(); + BasicValue::Vector(Arc::from(values)) + }) } - } - } + }), + + BasicValueType::Date + | BasicValueType::LocalDateTime + | BasicValueType::OffsetDateTime + | BasicValueType::Time + | BasicValueType::Uuid => point.payload.get(field_name).and_then(|v| { + v.as_str() + .map(|s| BasicValue::Str(Arc::from(s.to_string()))) + }), + BasicValueType::Range => point.payload.get(field_name).and_then(|v| { + v.as_struct().and_then(|s| { + let start = s.fields.get("start").and_then(|f| f.as_integer()); + let end = s.fields.get("end").and_then(|f| f.as_integer()); + + match (start, end) { + (Some(start), Some(end)) => Some(BasicValue::Range(RangeValue { + start: start as usize, + end: end as usize, + })), + _ => None, + } + }) + }), _ => { anyhow::bail!("Unsupported value type") } @@ -243,7 +268,8 @@ impl QueryTarget for Executor { .query(Query::new_nearest(query.vector)) .limit(query.limit as u64) .using(query.vector_field_name) - .with_payload(true), + .with_payload(true) + .with_vectors(true), ) .await? .result; @@ -300,8 +326,6 @@ impl StorageFactoryBase for Arc { _storage_options: IndexOptions, _context: Arc, ) -> Result> { - // TODO(Anush008): Add as a field to the Spec - if key_fields_schema.len() > 1 { api_bail!( "Expected only one primary key for the point ID. Got {}.", @@ -309,12 +333,11 @@ impl StorageFactoryBase for Arc { ) } - let url = "http://localhost:6334/"; let collection_name = spec.collection_name.clone(); let executors = async move { let executor = Arc::new(Executor::new( - url.to_string(), + spec.grpc_url, spec.collection_name.clone(), key_fields_schema, value_fields_schema, From a2b58b1b1b5a3cb68e4c90a4a698230d42879bd3 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Sun, 13 Apr 2025 16:37:24 +0530 Subject: [PATCH 11/19] doc: End-to-end example Signed-off-by: Anush008 --- docs/docs/ops/storages.md | 10 +++++-- examples/text_embedding/README.md | 40 ++++++++++++++++++++++---- examples/text_embedding/main.py | 47 +++++++++++++++++++++++-------- 3 files changed, 77 insertions(+), 20 deletions(-) diff --git a/docs/docs/ops/storages.md b/docs/docs/ops/storages.md index 0807128..3d3b748 100644 --- a/docs/docs/ops/storages.md +++ b/docs/docs/ops/storages.md @@ -21,6 +21,12 @@ Exports data to a [Qdrant](https://qdrant.tech/) collection. The spec takes the following fields: -* `qdrant_url` (type: `str`, required): The [gRPC URL](https://qdrant.tech/documentation/interfaces/#grpc-interface) of the Qdrant instance. Defaults to `http://localhost:6334/`. +* `grpc_url` (type: `str`, required): The [gRPC URL](https://qdrant.tech/documentation/interfaces/#grpc-interface) of the Qdrant instance. Defaults to `http://localhost:6334/`. -* `collection` (type: `str`, required): The name of the collection to export the data to. +* `collection_name` (type: `str`, required): The name of the collection to export the data to. + +The field name for the vector embeddings must match the [vector name](https://qdrant.tech/documentation/concepts/vectors/#named-vectors) used when the collection was created. + +If no primary key is set during export, a random UUID is used as the Qdrant point ID. + +You can find an end-to-end example [here](https://github.com/cocoindex-io/cocoindex/tree/main/examples/text_embedding). diff --git a/examples/text_embedding/README.md b/examples/text_embedding/README.md index 3c6dde7..8a55b52 100644 --- a/examples/text_embedding/README.md +++ b/examples/text_embedding/README.md @@ -1,7 +1,32 @@ -Simple example for cocoindex: build embedding index based on local files. +## Description -## Prerequisite -[Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one. +Example to build a vector index in Qdrant based on local files. + +## Pre-requisites + +- [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one. + +- Run Qdrant. + +```bash +docker run -d -p 6334:6334 -p 6333:6333 qdrant/qdrant +``` + +- [Create a collection](https://qdrant.tech/documentation/concepts/vectors/#named-vectors) to export the embeddings to. + +```bash +curl -X PUT \ + 'http://localhost:6333/collections/cocoindex' \ + --header 'Content-Type: application/json' \ + --data-raw '{ + "vectors": { + "text_embedding": { + "size": 384, + "distance": "Cosine" + } + } +}' +``` ## Run @@ -23,19 +48,22 @@ Update index: python main.py cocoindex update ``` +You can now view the data in the Qdrant dashboard at . + Run: ```bash python main.py ``` -## CocoInsight +## CocoInsight + CocoInsight is in Early Access now (Free) 😊 You found us! A quick 3 minute video tutorial about CocoInsight: [Watch on YouTube](https://youtu.be/ZnmyoHslBSc?si=pPLXWALztkA710r9). Run CocoInsight to understand your RAG data pipeline: -``` +```bash python main.py cocoindex server -c https://cocoindex.io ``` -Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight). \ No newline at end of file +Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight). diff --git a/examples/text_embedding/main.py b/examples/text_embedding/main.py index 70b3807..57f27a4 100644 --- a/examples/text_embedding/main.py +++ b/examples/text_embedding/main.py @@ -2,6 +2,7 @@ import cocoindex + def text_to_embedding(text: cocoindex.DataSlice) -> cocoindex.DataSlice: """ Embed the text using a SentenceTransformer model. @@ -9,40 +10,61 @@ def text_to_embedding(text: cocoindex.DataSlice) -> cocoindex.DataSlice: """ return text.transform( cocoindex.functions.SentenceTransformerEmbed( - model="sentence-transformers/all-MiniLM-L6-v2")) + model="sentence-transformers/all-MiniLM-L6-v2" + ) + ) + @cocoindex.flow_def(name="TextEmbedding") -def text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): +def text_embedding_flow( + flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope +): """ Define an example flow that embeds text into a vector database. """ data_scope["documents"] = flow_builder.add_source( - cocoindex.sources.LocalFile(path="markdown_files")) + cocoindex.sources.LocalFile(path="markdown_files") + ) doc_embeddings = data_scope.add_collector() with data_scope["documents"].row() as doc: doc["chunks"] = doc["content"].transform( cocoindex.functions.SplitRecursively(), - language="markdown", chunk_size=2000, chunk_overlap=500) + language="markdown", + chunk_size=2000, + chunk_overlap=500, + ) with doc["chunks"].row() as chunk: chunk["embedding"] = text_to_embedding(chunk["text"]) - doc_embeddings.collect(filename=doc["filename"], location=chunk["location"], - text=chunk["text"], embedding=chunk["embedding"]) + doc_embeddings.collect( + id=cocoindex.GeneratedField.UUID, + filename=doc["filename"], + location=chunk["location"], + text=chunk["text"], + # 'text_embedding' is the name of the vector we've created the Qdrant collection with. + text_embedding=chunk["embedding"], + ) doc_embeddings.export( "doc_embeddings", - cocoindex.storages.Postgres(), - primary_key_fields=["filename", "location"], - vector_index=[("embedding", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)]) + cocoindex.storages.Qdrant( + collection_name="cocoindex", grpc_url="http://localhost:6334/" + ), + primary_key_fields=["id"], + setup_by_user=True, + ) + query_handler = cocoindex.query.SimpleSemanticsQueryHandler( name="SemanticsSearch", flow=text_embedding_flow, target_name="doc_embeddings", query_transform_flow=text_to_embedding, - default_similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY) + default_similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY, +) + @cocoindex.main_fn() def _run(): @@ -50,9 +72,9 @@ def _run(): while True: try: query = input("Enter search query (or Enter to quit): ") - if query == '': + if query == "": break - results, _ = query_handler.search(query, 10) + results, _ = query_handler.search(query, 10, "text_embedding") print("\nSearch results:") for result in results: print(f"[{result.score:.3f}] {result.data['filename']}") @@ -62,6 +84,7 @@ def _run(): except KeyboardInterrupt: break + if __name__ == "__main__": load_dotenv(override=True) _run() From 9dc9abc3b497abff53fe559039c40a5edb3187f3 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Sun, 13 Apr 2025 16:44:40 +0530 Subject: [PATCH 12/19] feat: Support for api_key Signed-off-by: Anush008 --- docs/docs/ops/storages.md | 6 ++++-- python/cocoindex/storages.py | 1 + src/ops/storages/qdrant.rs | 8 +++++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/docs/docs/ops/storages.md b/docs/docs/ops/storages.md index 3d3b748..af75f88 100644 --- a/docs/docs/ops/storages.md +++ b/docs/docs/ops/storages.md @@ -21,10 +21,12 @@ Exports data to a [Qdrant](https://qdrant.tech/) collection. The spec takes the following fields: -* `grpc_url` (type: `str`, required): The [gRPC URL](https://qdrant.tech/documentation/interfaces/#grpc-interface) of the Qdrant instance. Defaults to `http://localhost:6334/`. - * `collection_name` (type: `str`, required): The name of the collection to export the data to. +* `grpc_url` (type: `str`, optional): The [gRPC URL](https://qdrant.tech/documentation/interfaces/#grpc-interface) of the Qdrant instance. Defaults to `http://localhost:6334/`. + +* `api_key` (type: `str`, optional). API key to authenticate requests with. + The field name for the vector embeddings must match the [vector name](https://qdrant.tech/documentation/concepts/vectors/#named-vectors) used when the collection was created. If no primary key is set during export, a random UUID is used as the Qdrant point ID. diff --git a/python/cocoindex/storages.py b/python/cocoindex/storages.py index 1a4549f..af1736d 100644 --- a/python/cocoindex/storages.py +++ b/python/cocoindex/storages.py @@ -17,6 +17,7 @@ class Qdrant(op.StorageSpec): collection_name: str grpc_url: str = "http://localhost:6334/" + api_key: str | None = None @dataclass class Neo4jConnectionSpec: diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index db427a2..2a6b494 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -37,6 +37,7 @@ fn key_value_fields_iter<'a>( pub struct Spec { collection_name: String, grpc_url: String, + api_key: Option, } pub struct Executor { @@ -51,6 +52,7 @@ impl Executor { fn new( url: String, collection_name: String, + api_key: Option, key_fields_schema: Vec, value_fields_schema: Vec, ) -> Result { @@ -60,7 +62,10 @@ impl Executor { .cloned() .collect::>(); Ok(Self { - client: Qdrant::from_url(&url).build()?, + client: Qdrant::from_url(&url) + .api_key(api_key) + .skip_compatibility_check() + .build()?, key_fields_schema, value_fields_schema, all_fields, @@ -339,6 +344,7 @@ impl StorageFactoryBase for Arc { let executor = Arc::new(Executor::new( spec.grpc_url, spec.collection_name.clone(), + spec.api_key, key_fields_schema, value_fields_schema, )?); From aa2acda58b1779e844fdb51c3b966fa170766c1b Mon Sep 17 00:00:00 2001 From: Anush008 Date: Sun, 13 Apr 2025 19:04:30 +0530 Subject: [PATCH 13/19] fix: no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point Signed-off-by: Anush008 --- examples/text_embedding/README.md | 4 ++-- src/ops/storages/qdrant.rs | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/text_embedding/README.md b/examples/text_embedding/README.md index 8a55b52..6d66a4f 100644 --- a/examples/text_embedding/README.md +++ b/examples/text_embedding/README.md @@ -28,6 +28,8 @@ curl -X PUT \ }' ``` +You can view the collections and data with the Qdrant dashboard at . + ## Run Install dependencies: @@ -48,8 +50,6 @@ Update index: python main.py cocoindex update ``` -You can now view the data in the Qdrant dashboard at . - Run: ```bash diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index 2a6b494..1396df5 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -61,6 +61,12 @@ impl Executor { .chain(value_fields_schema.iter()) .cloned() .collect::>(); + + // Hotfix to resolve + // `no process-level CryptoProvider available -- call CryptoProvider::install_default() before this point` + // when using HTTPS URLs. + let _ = rustls::crypto::ring::default_provider().install_default(); + Ok(Self { client: Qdrant::from_url(&url) .api_key(api_key) From 67e3608248ae37e5e5d2f4e9b212a53382d9b2d6 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Mon, 14 Apr 2025 12:38:46 +0530 Subject: [PATCH 14/19] chore: Removed key_value_fields_iter() Signed-off-by: Anush008 --- src/ops/storages/qdrant.rs | 42 ++++++++------------------------------ 1 file changed, 9 insertions(+), 33 deletions(-) diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index 1396df5..32791a9 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -16,22 +16,6 @@ use qdrant_client::qdrant::{Query, QueryPointsBuilder, ScoredPoint}; use qdrant_client::Qdrant; use serde::Serialize; use serde_json::json; -use uuid::Uuid; - -fn key_value_fields_iter<'a>( - key_fields_schema: &[FieldSchema], - key_value: &'a KeyValue, -) -> Result<&'a [KeyValue]> { - let slice = if key_fields_schema.len() == 1 { - std::slice::from_ref(key_value) - } else { - match key_value { - KeyValue::Struct(fields) => fields, - _ => anyhow::bail!("expect struct key value"), - } - }; - Ok(slice) -} #[derive(Debug, Deserialize, Clone)] pub struct Spec { @@ -43,7 +27,6 @@ pub struct Spec { pub struct Executor { client: Qdrant, collection_name: String, - key_fields_schema: Vec, value_fields_schema: Vec, all_fields: Vec, } @@ -72,7 +55,6 @@ impl Executor { .api_key(api_key) .skip_compatibility_check() .build()?, - key_fields_schema, value_fields_schema, all_fields, collection_name, @@ -85,8 +67,7 @@ impl ExportTargetExecutor for Executor { async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()> { let mut points: Vec = Vec::with_capacity(mutation.upserts.len()); for upsert in mutation.upserts.iter() { - let key_fields = key_value_fields_iter(&self.key_fields_schema, &upsert.key)?; - let point_id = key_to_point_id(key_fields)?; + let point_id = key_to_point_id(&upsert.key)?; let (payload, vectors) = values_to_payload(&upsert.value.fields, &self.value_fields_schema)?; @@ -99,17 +80,12 @@ impl ExportTargetExecutor for Executor { Ok(()) } } -fn key_to_point_id(key_values: &[KeyValue]) -> Result { - let point_id = if let Some(key_value) = key_values.first() { - match key_value { - KeyValue::Str(v) => PointId::from(v.to_string()), - KeyValue::Int64(v) => PointId::from(*v as u64), - KeyValue::Uuid(v) => PointId::from(v.to_string()), - _ => bail!("Unsupported Qdrant Point ID key type"), - } - } else { - let uuid = Uuid::new_v4().to_string(); - PointId::from(uuid) +fn key_to_point_id(key_value: &KeyValue) -> Result { + let point_id = match key_value { + KeyValue::Str(v) => PointId::from(v.to_string()), + KeyValue::Int64(v) => PointId::from(*v as u64), + KeyValue::Uuid(v) => PointId::from(v.to_string()), + e => bail!("Invalid Qdrant point ID: {e}"), }; Ok(point_id) @@ -337,9 +313,9 @@ impl StorageFactoryBase for Arc { _storage_options: IndexOptions, _context: Arc, ) -> Result> { - if key_fields_schema.len() > 1 { + if key_fields_schema.len() != 1 { api_bail!( - "Expected only one primary key for the point ID. Got {}.", + "Expected one primary key field for the point ID. Got {}.", key_fields_schema.len() ) } From 86fc14c5368a726e8b88614362e5ac7cc5e0622d Mon Sep 17 00:00:00 2001 From: Anush008 Date: Mon, 14 Apr 2025 12:39:00 +0530 Subject: [PATCH 15/19] docs: examples/text_embedding_qdrant Signed-off-by: Anush008 --- docs/docs/ops/storages.md | 23 +- examples/text_embedding/README.md | 40 +-- examples/text_embedding/main.py | 49 +-- examples/text_embedding/pyproject.toml | 2 +- examples/text_embedding_qdrant/.env | 2 + examples/text_embedding_qdrant/README.md | 69 ++++ examples/text_embedding_qdrant/main.py | 90 +++++ .../markdown_files/rfc8259.md | 334 ++++++++++++++++++ examples/text_embedding_qdrant/pyproject.toml | 6 + 9 files changed, 539 insertions(+), 76 deletions(-) create mode 100644 examples/text_embedding_qdrant/.env create mode 100644 examples/text_embedding_qdrant/README.md create mode 100644 examples/text_embedding_qdrant/main.py create mode 100644 examples/text_embedding_qdrant/markdown_files/rfc8259.md create mode 100644 examples/text_embedding_qdrant/pyproject.toml diff --git a/docs/docs/ops/storages.md b/docs/docs/ops/storages.md index af75f88..320a76f 100644 --- a/docs/docs/ops/storages.md +++ b/docs/docs/ops/storages.md @@ -27,8 +27,21 @@ The spec takes the following fields: * `api_key` (type: `str`, optional). API key to authenticate requests with. -The field name for the vector embeddings must match the [vector name](https://qdrant.tech/documentation/concepts/vectors/#named-vectors) used when the collection was created. - -If no primary key is set during export, a random UUID is used as the Qdrant point ID. - -You can find an end-to-end example [here](https://github.com/cocoindex-io/cocoindex/tree/main/examples/text_embedding). +Before exporting, you must create a collection with a [vector name](https://qdrant.tech/documentation/concepts/vectors/#named-vectors) that matches the vector field name in CocoIndex, and set `setup_by_user=True` during export. + +Example: + +```python +doc_embeddings.export( + "doc_embeddings", + cocoindex.storages.Qdrant( + collection_name="cocoindex", + grpc_url="http://xyz-example.cloud-region.cloud-provider.cloud.qdrant.io:6334/", + api_key="", + ), + primary_key_fields=["id_field"], + setup_by_user=True, +) +``` + +You can find an end-to-end example [here](https://github.com/cocoindex-io/cocoindex/tree/main/examples/text_embedding_qdrant). diff --git a/examples/text_embedding/README.md b/examples/text_embedding/README.md index 6d66a4f..3c6dde7 100644 --- a/examples/text_embedding/README.md +++ b/examples/text_embedding/README.md @@ -1,34 +1,7 @@ -## Description +Simple example for cocoindex: build embedding index based on local files. -Example to build a vector index in Qdrant based on local files. - -## Pre-requisites - -- [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one. - -- Run Qdrant. - -```bash -docker run -d -p 6334:6334 -p 6333:6333 qdrant/qdrant -``` - -- [Create a collection](https://qdrant.tech/documentation/concepts/vectors/#named-vectors) to export the embeddings to. - -```bash -curl -X PUT \ - 'http://localhost:6333/collections/cocoindex' \ - --header 'Content-Type: application/json' \ - --data-raw '{ - "vectors": { - "text_embedding": { - "size": 384, - "distance": "Cosine" - } - } -}' -``` - -You can view the collections and data with the Qdrant dashboard at . +## Prerequisite +[Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one. ## Run @@ -56,14 +29,13 @@ Run: python main.py ``` -## CocoInsight - +## CocoInsight CocoInsight is in Early Access now (Free) 😊 You found us! A quick 3 minute video tutorial about CocoInsight: [Watch on YouTube](https://youtu.be/ZnmyoHslBSc?si=pPLXWALztkA710r9). Run CocoInsight to understand your RAG data pipeline: -```bash +``` python main.py cocoindex server -c https://cocoindex.io ``` -Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight). +Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight). \ No newline at end of file diff --git a/examples/text_embedding/main.py b/examples/text_embedding/main.py index 57f27a4..1ee8726 100644 --- a/examples/text_embedding/main.py +++ b/examples/text_embedding/main.py @@ -2,7 +2,6 @@ import cocoindex - def text_to_embedding(text: cocoindex.DataSlice) -> cocoindex.DataSlice: """ Embed the text using a SentenceTransformer model. @@ -10,61 +9,40 @@ def text_to_embedding(text: cocoindex.DataSlice) -> cocoindex.DataSlice: """ return text.transform( cocoindex.functions.SentenceTransformerEmbed( - model="sentence-transformers/all-MiniLM-L6-v2" - ) - ) - + model="sentence-transformers/all-MiniLM-L6-v2")) @cocoindex.flow_def(name="TextEmbedding") -def text_embedding_flow( - flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope -): +def text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): """ Define an example flow that embeds text into a vector database. """ data_scope["documents"] = flow_builder.add_source( - cocoindex.sources.LocalFile(path="markdown_files") - ) + cocoindex.sources.LocalFile(path="markdown_files")) doc_embeddings = data_scope.add_collector() with data_scope["documents"].row() as doc: doc["chunks"] = doc["content"].transform( cocoindex.functions.SplitRecursively(), - language="markdown", - chunk_size=2000, - chunk_overlap=500, - ) + language="markdown", chunk_size=2000, chunk_overlap=500) with doc["chunks"].row() as chunk: chunk["embedding"] = text_to_embedding(chunk["text"]) - doc_embeddings.collect( - id=cocoindex.GeneratedField.UUID, - filename=doc["filename"], - location=chunk["location"], - text=chunk["text"], - # 'text_embedding' is the name of the vector we've created the Qdrant collection with. - text_embedding=chunk["embedding"], - ) + doc_embeddings.collect(filename=doc["filename"], location=chunk["location"], + text=chunk["text"], embedding=chunk["embedding"]) doc_embeddings.export( "doc_embeddings", - cocoindex.storages.Qdrant( - collection_name="cocoindex", grpc_url="http://localhost:6334/" - ), - primary_key_fields=["id"], - setup_by_user=True, - ) - + cocoindex.storages.Postgres(), + primary_key_fields=["filename", "location"], + vector_index=[("embedding", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)]) query_handler = cocoindex.query.SimpleSemanticsQueryHandler( name="SemanticsSearch", flow=text_embedding_flow, target_name="doc_embeddings", query_transform_flow=text_to_embedding, - default_similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY, -) - + default_similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY) @cocoindex.main_fn() def _run(): @@ -72,9 +50,9 @@ def _run(): while True: try: query = input("Enter search query (or Enter to quit): ") - if query == "": + if query == '': break - results, _ = query_handler.search(query, 10, "text_embedding") + results, _ = query_handler.search(query, 10) print("\nSearch results:") for result in results: print(f"[{result.score:.3f}] {result.data['filename']}") @@ -84,7 +62,6 @@ def _run(): except KeyboardInterrupt: break - if __name__ == "__main__": load_dotenv(override=True) - _run() + _run() \ No newline at end of file diff --git a/examples/text_embedding/pyproject.toml b/examples/text_embedding/pyproject.toml index 8546886..f27d2cd 100644 --- a/examples/text_embedding/pyproject.toml +++ b/examples/text_embedding/pyproject.toml @@ -3,4 +3,4 @@ name = "text-embedding" version = "0.1.0" description = "Simple example for cocoindex: build embedding index based on local text files." requires-python = ">=3.10" -dependencies = ["cocoindex>=0.1.19", "python-dotenv>=1.0.1"] +dependencies = ["cocoindex>=0.1.19", "python-dotenv>=1.0.1"] \ No newline at end of file diff --git a/examples/text_embedding_qdrant/.env b/examples/text_embedding_qdrant/.env new file mode 100644 index 0000000..335f306 --- /dev/null +++ b/examples/text_embedding_qdrant/.env @@ -0,0 +1,2 @@ +# Postgres database address for cocoindex +COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex diff --git a/examples/text_embedding_qdrant/README.md b/examples/text_embedding_qdrant/README.md new file mode 100644 index 0000000..6d66a4f --- /dev/null +++ b/examples/text_embedding_qdrant/README.md @@ -0,0 +1,69 @@ +## Description + +Example to build a vector index in Qdrant based on local files. + +## Pre-requisites + +- [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) if you don't have one. + +- Run Qdrant. + +```bash +docker run -d -p 6334:6334 -p 6333:6333 qdrant/qdrant +``` + +- [Create a collection](https://qdrant.tech/documentation/concepts/vectors/#named-vectors) to export the embeddings to. + +```bash +curl -X PUT \ + 'http://localhost:6333/collections/cocoindex' \ + --header 'Content-Type: application/json' \ + --data-raw '{ + "vectors": { + "text_embedding": { + "size": 384, + "distance": "Cosine" + } + } +}' +``` + +You can view the collections and data with the Qdrant dashboard at . + +## Run + +Install dependencies: + +```bash +pip install -e . +``` + +Setup: + +```bash +python main.py cocoindex setup +``` + +Update index: + +```bash +python main.py cocoindex update +``` + +Run: + +```bash +python main.py +``` + +## CocoInsight + +CocoInsight is in Early Access now (Free) 😊 You found us! A quick 3 minute video tutorial about CocoInsight: [Watch on YouTube](https://youtu.be/ZnmyoHslBSc?si=pPLXWALztkA710r9). + +Run CocoInsight to understand your RAG data pipeline: + +```bash +python main.py cocoindex server -c https://cocoindex.io +``` + +Then open the CocoInsight UI at [https://cocoindex.io/cocoinsight](https://cocoindex.io/cocoinsight). diff --git a/examples/text_embedding_qdrant/main.py b/examples/text_embedding_qdrant/main.py new file mode 100644 index 0000000..57f27a4 --- /dev/null +++ b/examples/text_embedding_qdrant/main.py @@ -0,0 +1,90 @@ +from dotenv import load_dotenv + +import cocoindex + + +def text_to_embedding(text: cocoindex.DataSlice) -> cocoindex.DataSlice: + """ + Embed the text using a SentenceTransformer model. + This is a shared logic between indexing and querying, so extract it as a function. + """ + return text.transform( + cocoindex.functions.SentenceTransformerEmbed( + model="sentence-transformers/all-MiniLM-L6-v2" + ) + ) + + +@cocoindex.flow_def(name="TextEmbedding") +def text_embedding_flow( + flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope +): + """ + Define an example flow that embeds text into a vector database. + """ + data_scope["documents"] = flow_builder.add_source( + cocoindex.sources.LocalFile(path="markdown_files") + ) + + doc_embeddings = data_scope.add_collector() + + with data_scope["documents"].row() as doc: + doc["chunks"] = doc["content"].transform( + cocoindex.functions.SplitRecursively(), + language="markdown", + chunk_size=2000, + chunk_overlap=500, + ) + + with doc["chunks"].row() as chunk: + chunk["embedding"] = text_to_embedding(chunk["text"]) + doc_embeddings.collect( + id=cocoindex.GeneratedField.UUID, + filename=doc["filename"], + location=chunk["location"], + text=chunk["text"], + # 'text_embedding' is the name of the vector we've created the Qdrant collection with. + text_embedding=chunk["embedding"], + ) + + doc_embeddings.export( + "doc_embeddings", + cocoindex.storages.Qdrant( + collection_name="cocoindex", grpc_url="http://localhost:6334/" + ), + primary_key_fields=["id"], + setup_by_user=True, + ) + + +query_handler = cocoindex.query.SimpleSemanticsQueryHandler( + name="SemanticsSearch", + flow=text_embedding_flow, + target_name="doc_embeddings", + query_transform_flow=text_to_embedding, + default_similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY, +) + + +@cocoindex.main_fn() +def _run(): + # Run queries in a loop to demonstrate the query capabilities. + while True: + try: + query = input("Enter search query (or Enter to quit): ") + if query == "": + break + results, _ = query_handler.search(query, 10, "text_embedding") + print("\nSearch results:") + for result in results: + print(f"[{result.score:.3f}] {result.data['filename']}") + print(f" {result.data['text']}") + print("---") + print() + except KeyboardInterrupt: + break + + +if __name__ == "__main__": + load_dotenv(override=True) + _run() diff --git a/examples/text_embedding_qdrant/markdown_files/rfc8259.md b/examples/text_embedding_qdrant/markdown_files/rfc8259.md new file mode 100644 index 0000000..b0911a4 --- /dev/null +++ b/examples/text_embedding_qdrant/markdown_files/rfc8259.md @@ -0,0 +1,334 @@ +Internet Engineering Task Force (IETF) T. Bray, Ed. Request for Comments: 8259 Textuality Obsoletes: 7159 December 2017 Category: Standards Track ISSN: 2070-1721 + +The JavaScript Object Notation (JSON) Data Interchange Format + +Abstract + + JavaScript Object Notation (JSON) is a lightweight, text-based, language-independent data interchange format. It was derived from the ECMAScript Programming Language Standard. JSON defines a small set of formatting rules for the portable representation of structured data. + + This document removes inconsistencies with other specifications of JSON, repairs specification errors, and offers experience-based interoperability guidance. + +Status of This Memo + +This is an Internet Standards Track document. + + This document is a product of the Internet Engineering Task Force (IETF). It represents the consensus of the IETF community. It has received public review and has been approved for publication by the Internet Engineering Steering Group (IESG). Further information on Internet Standards is available in Section 2 of RFC 7841. + + Information about the current status of this document, any errata, and how to provide feedback on it may be obtained at https://www.rfc-editor.org/info/rfc8259. + +Bray Standards Track [Page 1] + +Copyright Notice + + Copyright (c) 2017 IETF Trust and the persons identified as the document authors. All rights reserved. + + This document is subject to BCP 78 and the IETF Trust's Legal Provisions Relating to IETF Documents (https://trustee.ietf.org/license-info) in effect on the date of publication of this document. Please review these documents carefully, as they describe your rights and restrictions with respect to this document. Code Components extracted from this document must include Simplified BSD License text as described in Section 4.e of the Trust Legal Provisions and are provided without warranty as + +described in the Simplified BSD License. + + This document may contain material from IETF Documents or IETF Contributions published or made publicly available before November 10, 2008. The person(s) controlling the copyright in some of this material may not have granted the IETF Trust the right to allow modifications of such material outside the IETF Standards Process. Without obtaining an adequate license from the person(s) controlling the copyright in such materials, this document may not be modified outside the IETF Standards Process, and derivative works of it may not be created outside the IETF Standards Process, except to format it for publication as an RFC or to translate it into languages other than English. + +Bray Standards Track [Page 2] + + +## 1. Introduction + + JavaScript Object Notation (JSON) is a text format for the serialization of structured data. It is derived from the object literals of JavaScript, as defined in the ECMAScript Programming Language Standard, Third Edition [ECMA-262]. + + JSON can represent four primitive types (strings, numbers, booleans, and null) and two structured types (objects and arrays). + + A string is a sequence of zero or more Unicode characters [UNICODE]. Note that this citation references the latest version of Unicode rather than a specific release. It is not expected that future changes in the Unicode specification will impact the syntax of JSON. + + An object is an unordered collection of zero or more name/value pairs, where a name is a string and a value is a string, number, boolean, null, object, or array. + +An array is an ordered sequence of zero or more values. + +Bray Standards Track [Page 3] + + The terms "object" and "array" come from the conventions of JavaScript. + + JSON's design goals were for it to be minimal, portable, textual, and a subset of JavaScript. + +1.1. Conventions Used in This Document + + The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT", "SHOULD", "SHOULD NOT", "RECOMMENDED", "NOT RECOMMENDED", "MAY", and "OPTIONAL" in this document are to be interpreted as described in BCP 14 [RFC2119] [RFC8174] when, and only when, they appear in all capitals, as shown here. + + The grammatical rules in this document are to be interpreted as described in [RFC5234]. + +1.2. Specifications of JSON + + This document replaces [RFC7159]. [RFC7159] obsoleted [RFC4627], which originally described JSON and registered the media type "application/json". + +JSON is also described in [ECMA-404]. + + The reference to ECMA-404 in the previous sentence is normative, not with the usual meaning that implementors need to consult it in order to understand this document, but to emphasize that there are no inconsistencies in the definition of the term "JSON text" in any of its specifications. Note, however, that ECMA-404 allows several practices that this specification recommends avoiding in the interests of maximal interoperability. + + The intent is that the grammar is the same between the two documents, although different descriptions are used. If there is a difference found between them, ECMA and the IETF will work together to update both documents. + + If an error is found with either document, the other should be examined to see if it has a similar error; if it does, it should be fixed, if possible. + + If either document is changed in the future, ECMA and the IETF will work together to ensure that the two documents stay aligned through the change. + +Bray Standards Track [Page 4] + +- 1.3. Introduction to This Revision + In the years since the publication of RFC 4627, JSON has found very wide use. This experience has revealed certain patterns that, while allowed by its specifications, have caused interoperability problems. + + Also, a small number of errata have been reported regarding RFC 4627 (see RFC Errata IDs 607 [Err607] and 3607 [Err3607]) and regarding RFC 7159 (see RFC Errata IDs 3915 [Err3915], 4264 [Err4264], 4336 [Err4336], and 4388 [Err4388]). + + This document's goal is to apply the errata, remove inconsistencies with other specifications of JSON, and highlight practices that can lead to interoperability problems. + +- 2. JSON Grammar + A JSON text is a sequence of tokens. The set of tokens includes six structural characters, strings, numbers, and three literal names. + + A JSON text is a serialized value. Note that certain previous specifications of JSON constrained a JSON text to be an object or an array. Implementations that generate only objects or arrays where a JSON text is called for will be interoperable in the sense that all implementations will accept these as conforming JSON texts. + +JSON-text = ws value ws + +These are the six structural characters: + +| begin-array | | | | = ws %x5B ws ; [ left square bracket | +|----------------------------------------|--|--|--|---------------------------------------| +| begin-object | | | | = ws %x7B ws ; { left curly bracket | +| end-array | | | | = ws %x5D ws ; ] right square bracket | +| end-object | | | | = ws %x7D ws ; } right curly bracket | +| name-separator = ws %x3A ws ; : colon | | | | | +| value-separator = ws %x2C ws ; , comma | | | | | + +Bray Standards Track [Page 5] + + Insignificant whitespace is allowed before or after any of the six structural characters. + + ws = *( %x20 / ; Space %x09 / ; Horizontal tab %x0A / ; Line feed or New line %x0D ) ; Carriage return + +## 3. Values + + A JSON value MUST be an object, array, number, or string, or one of the following three literal names: + + false null true + + The literal names MUST be lowercase. No other literal names are allowed. + + value = false / null / true / object / array / number / string false = %x66.61.6c.73.65 ; false null = %x6e.75.6c.6c ; null true = %x74.72.75.65 ; true + +## 4. Objects + + An object structure is represented as a pair of curly brackets surrounding zero or more name/value pairs (or members). A name is a string. A single colon comes after each name, separating the name from the value. A single comma separates a value from a following name. The names within an object SHOULD be unique. + + object = begin-object [ member *( value-separator member ) ] end-object + +member = string name-separator value + + An object whose names are all unique is interoperable in the sense that all software implementations receiving that object will agree on the name-value mappings. When the names within an object are not unique, the behavior of software that receives such an object is unpredictable. Many implementations report the last name/value pair only. Other implementations report an error or fail to parse the + +Bray Standards Track [Page 6] + + object, and some implementations report all of the name/value pairs, including duplicates. + + JSON parsing libraries have been observed to differ as to whether or not they make the ordering of object members visible to calling software. Implementations whose behavior does not depend on member ordering will be interoperable in the sense that they will not be affected by these differences. + +5. Arrays + + An array structure is represented as square brackets surrounding zero or more values (or elements). Elements are separated by commas. + +array = begin-array [ value *( value-separator value ) ] end-array + + There is no requirement that the values in an array be of the same type. + +6. Numbers + + The representation of numbers is similar to that used in most programming languages. A number is represented in base 10 using decimal digits. It contains an integer component that may be prefixed with an optional minus sign, which may be followed by a fraction part and/or an exponent part. Leading zeros are not allowed. + +A fraction part is a decimal point followed by one or more digits. + + An exponent part begins with the letter E in uppercase or lowercase, which may be followed by a plus or minus sign. The E and optional sign are followed by one or more digits. + + Numeric values that cannot be represented in the grammar below (such as Infinity and NaN) are not permitted. + + number = [ minus ] int [ frac ] [ exp ] decimal-point = %x2E ; . digit1-9 = %x31-39 ; 1-9 e = %x65 / %x45 ; e E exp = e [ minus / plus ] 1*DIGIT frac = decimal-point 1*DIGIT + +Bray Standards Track [Page 7] + + int = zero / ( digit1-9 *DIGIT ) minus = %x2D ; plus = %x2B ; + zero = %x30 ; 0 + + This specification allows implementations to set limits on the range and precision of numbers accepted. Since software that implements IEEE 754 binary64 (double precision) numbers [IEEE754] is generally available and widely used, good interoperability can be achieved by implementations that expect no more precision or range than these provide, in the sense that implementations will approximate JSON numbers within the expected precision. A JSON number such as 1E400 or 3.141592653589793238462643383279 may indicate potential interoperability problems, since it suggests that the software that created it expects receiving software to have greater capabilities for numeric magnitude and precision than is widely available. + + Note that when such software is used, numbers that are integers and are in the range [-(2**53)+1, (2**53)-1] are interoperable in the sense that implementations will agree exactly on their numeric values. + +## 7. Strings + + The representation of strings is similar to conventions used in the C family of programming languages. A string begins and ends with quotation marks. All Unicode characters may be placed within the quotation marks, except for the characters that MUST be escaped: quotation mark, reverse solidus, and the control characters (U+0000 through U+001F). + + Any character may be escaped. If the character is in the Basic Multilingual Plane (U+0000 through U+FFFF), then it may be represented as a six-character sequence: a reverse solidus, followed by the lowercase letter u, followed by four hexadecimal digits that encode the character's code point. The hexadecimal letters A through F can be uppercase or lowercase. So, for example, a string containing only a single reverse solidus character may be represented as "\u005C". + + Alternatively, there are two-character sequence escape representations of some popular characters. So, for example, a string containing only a single reverse solidus character may be represented more compactly as "\\". + +Bray Standards Track [Page 8] + + To escape an extended character that is not in the Basic Multilingual Plane, the character is represented as a 12-character sequence, encoding the UTF-16 surrogate pair. So, for example, a string containing only the G clef character (U+1D11E) may be represented as "\uD834\uDD1E". + + string = quotation-mark *char quotation-mark char = unescaped / escape ( %x22 / ; " quotation mark U+0022 %x5C / ; \ reverse solidus U+005C %x2F / ; / solidus U+002F %x62 / ; b backspace U+0008 %x66 / ; f form feed U+000C %x6E / ; n line feed U+000A %x72 / ; r carriage return U+000D %x74 / ; t tab U+0009 %x75 4HEXDIG ) ; uXXXX U+XXXX escape = %x5C ; \ quotation-mark = %x22 ; " unescaped = %x20-21 / %x23-5B / %x5D-10FFFF + +# 8. String and Character Issues + +- 8.1. Character Encoding + JSON text exchanged between systems that are not part of a closed ecosystem MUST be encoded using UTF-8 [RFC3629]. + + Previous specifications of JSON have not required the use of UTF-8 when transmitting JSON text. However, the vast majority of JSON based software implementations have chosen to use the UTF-8 encoding, to the extent that it is the only encoding that achieves interoperability. + + Implementations MUST NOT add a byte order mark (U+FEFF) to the beginning of a networked-transmitted JSON text. In the interests of interoperability, implementations that parse JSON texts MAY ignore the presence of a byte order mark rather than treating it as an error. + +Bray Standards Track [Page 9] + +## 8.2. Unicode Characters + + When all the strings represented in a JSON text are composed entirely of Unicode characters [UNICODE] (however escaped), then that JSON text is interoperable in the sense that all software implementations that parse it will agree on the contents of names and of string values in objects and arrays. + + However, the ABNF in this specification allows member names and string values to contain bit sequences that cannot encode Unicode characters; for example, "\uDEAD" (a single unpaired UTF-16 surrogate). Instances of this have been observed, for example, when a library truncates a UTF-16 string without checking whether the truncation split a surrogate pair. The behavior of software that receives JSON texts containing such values is unpredictable; for example, implementations might return different values for the length of a string value or even suffer fatal runtime exceptions. + +## 8.3. String Comparison + + Software implementations are typically required to test names of object members for equality. Implementations that transform the textual representation into sequences of Unicode code units and then perform the comparison numerically, code unit by code unit, are interoperable in the sense that implementations will agree in all cases on equality or inequality of two strings. For example, implementations that compare strings with escaped characters unconverted may incorrectly find that "a\\b" and "a\u005Cb" are not equal. + +## 9. Parsers + + A JSON parser transforms a JSON text into another representation. A JSON parser MUST accept all texts that conform to the JSON grammar. A JSON parser MAY accept non-JSON forms or extensions. + + An implementation may set limits on the size of texts that it accepts. An implementation may set limits on the maximum depth of nesting. An implementation may set limits on the range and precision of numbers. An implementation may set limits on the length and character contents of strings. + +## 10. Generators + + A JSON generator produces JSON text. The resulting text MUST strictly conform to the JSON grammar. + +Bray Standards Track [Page 10] + +11. IANA Considerations + +The media type for JSON text is application/json. + +Type name: application + +Subtype name: json + +Required parameters: n/a + +Optional parameters: n/a + +Encoding considerations: binary + +Security considerations: See RFC 8259, Section 12 + +Interoperability considerations: Described in RFC 8259 + +Published specification: RFC 8259 + + Applications that use this media type: JSON has been used to exchange data between applications written in all of these programming languages: ActionScript, C, C#, Clojure, ColdFusion, Common Lisp, E, Erlang, Go, Java, JavaScript, Lua, Objective CAML, Perl, PHP, Python, Rebol, Ruby, Scala, and Scheme. + + Additional information: Magic number(s): n/a File extension(s): .json Macintosh file type code(s): TEXT + + Person & email address to contact for further information: IESG + +Intended usage: COMMON + +Restrictions on usage: none + + Author: Douglas Crockford + + Change controller: IESG + +Bray Standards Track [Page 11] + + Note: No "charset" parameter is defined for this registration. Adding one really has no effect on compliant recipients. + +- 12. Security Considerations + Generally, there are security issues with scripting languages. JSON is a subset of JavaScript but excludes assignment and invocation. + + Since JSON's syntax is borrowed from JavaScript, it is possible to use that language's "eval()" function to parse most JSON texts (but not all; certain characters such as U+2028 LINE SEPARATOR and U+2029 PARAGRAPH SEPARATOR are legal in JSON but not JavaScript). This generally constitutes an unacceptable security risk, since the text could contain executable code along with data declarations. The same consideration applies to the use of eval()-like functions in any other programming language in which JSON texts conform to that language's syntax. + +## 13. Examples + + This is a JSON object: { "Image": { "Width": 800, "Height": 600, "Title": "View from 15th Floor", "Thumbnail": { "Url": "http://www.example.com/image/481989943", "Height": 125, "Width": 100 }, "Animated" : false, "IDs": [116, 943, 234, 38793] } } + + Its Image member is an object whose Thumbnail member is an object and whose IDs member is an array of numbers. + +Bray Standards Track [Page 12] + +``` + This is a JSON array containing two objects: +[ +{ +"precision": "zip", +"Latitude": 37.7668, +"Longitude": -122.3959, +"Address": "", +"City": "SAN FRANCISCO", +"State": "CA", +"Zip": "94107", +"Country": "US" +}, +{ +"precision": "zip", +"Latitude": 37.371991, +"Longitude": -122.026020, +"Address": "", +"City": "SUNNYVALE", +"State": "CA", +"Zip": "94085", +"Country": "US" +} +] +Here are three small JSON texts containing only values: +"Hello world!" +42 +true +``` +Bray Standards Track [Page 13] + +## 14. References + +- 14.1. Normative References +- [ECMA-404] Ecma International, "The JSON Data Interchange Format", Standard ECMA-404, . +- [IEEE754] IEEE, "IEEE Standard for Floating-Point Arithmetic", IEEE 754. +- [RFC2119] Bradner, S., "Key words for use in RFCs to Indicate Requirement Levels", BCP 14, RFC 2119, DOI 10.17487/RFC2119, March 1997, . +- [RFC3629] Yergeau, F., "UTF-8, a transformation format of ISO 10646", STD 63, RFC 3629, DOI 10.17487/RFC3629, November 2003, . +- [RFC5234] Crocker, D., Ed. and P. Overell, "Augmented BNF for Syntax Specifications: ABNF", STD 68, RFC 5234, DOI 10.17487/RFC5234, January 2008, . +- [RFC8174] Leiba, B., "Ambiguity of Uppercase vs Lowercase in RFC 2119 Key Words", BCP 14, RFC 8174, DOI 10.17487/RFC8174, May 2017, . +- [UNICODE] The Unicode Consortium, "The Unicode Standard", . +- 14.2. Informative References +- [ECMA-262] Ecma International, "ECMAScript Language Specification", Standard ECMA-262, Third Edition, December 1999, . +- [Err3607] RFC Errata, Erratum ID 3607, RFC 4627, . +- [Err3915] RFC Errata, Erratum ID 3915, RFC 7159, . + +Bray Standards Track [Page 14] + +- [Err4264] RFC Errata, Erratum ID 4264, RFC 7159, . +- [Err4336] RFC Errata, Erratum ID 4336, RFC 7159, . +- [Err4388] RFC Errata, Erratum ID 4388, RFC 7159, . +- [Err607] RFC Errata, Erratum ID 607, RFC 4627, . +- [RFC4627] Crockford, D., "The application/json Media Type for JavaScript Object Notation (JSON)", RFC 4627, DOI 10.17487/RFC4627, July 2006, . +- [RFC7159] Bray, T., Ed., "The JavaScript Object Notation (JSON) Data Interchange Format", RFC 7159, DOI 10.17487/RFC7159, March 2014, . + +Bray Standards Track [Page 15] + +Appendix A. Changes from RFC 7159 + + This section lists changes between this document and the text in RFC 7159. + +- o Section 1.2 has been updated to reflect the removal of a JSON specification from ECMA-262, to make ECMA-404 a normative reference, and to explain the particular meaning of "normative". +- o Section 1.3 has been updated to reflect errata filed against RFC 7159, not RFC 4627. +- o Section 8.1 was changed to require the use of UTF-8 when transmitted over a network. +- o Section 12 has been updated to increase the precision of the description of the security risk that follows from using the ECMAScript "eval()" function. +- o Section 14.1 has been updated to include ECMA-404 as a normative reference. +- o Section 14.2 has been updated to remove ECMA-404, update the version of ECMA-262, and refresh the errata list. + +Contributors + + RFC 4627 was written by Douglas Crockford. This document was constructed by making a relatively small number of changes to that document; thus, the vast majority of the text here is his. + +Author's Address + + Tim Bray (editor) Textuality + +Email: tbray@textuality.com + +Bray Standards Track [Page 16] \ No newline at end of file diff --git a/examples/text_embedding_qdrant/pyproject.toml b/examples/text_embedding_qdrant/pyproject.toml new file mode 100644 index 0000000..8546886 --- /dev/null +++ b/examples/text_embedding_qdrant/pyproject.toml @@ -0,0 +1,6 @@ +[project] +name = "text-embedding" +version = "0.1.0" +description = "Simple example for cocoindex: build embedding index based on local text files." +requires-python = ">=3.10" +dependencies = ["cocoindex>=0.1.19", "python-dotenv>=1.0.1"] From f749375b90ace2e932263a9e92e3264b5a66c49b Mon Sep 17 00:00:00 2001 From: Anush008 Date: Mon, 14 Apr 2025 12:42:20 +0530 Subject: [PATCH 16/19] chore: Undo change to examples/pdf_embedding Signed-off-by: Anush008 --- examples/pdf_embedding/main.py | 2 +- examples/text_embedding/main.py | 2 +- examples/text_embedding/pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/pdf_embedding/main.py b/examples/pdf_embedding/main.py index 7cb1b0d..a87ea9a 100644 --- a/examples/pdf_embedding/main.py +++ b/examples/pdf_embedding/main.py @@ -61,7 +61,7 @@ def pdf_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoinde doc_embeddings.export( "doc_embeddings", - cocoindex.storages.Qdrant(collection_name="cocoindex"), + cocoindex.storages.Postgres(), primary_key_fields=["id"], vector_index=[("embedding", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)]) diff --git a/examples/text_embedding/main.py b/examples/text_embedding/main.py index 1ee8726..70b3807 100644 --- a/examples/text_embedding/main.py +++ b/examples/text_embedding/main.py @@ -64,4 +64,4 @@ def _run(): if __name__ == "__main__": load_dotenv(override=True) - _run() \ No newline at end of file + _run() diff --git a/examples/text_embedding/pyproject.toml b/examples/text_embedding/pyproject.toml index f27d2cd..8546886 100644 --- a/examples/text_embedding/pyproject.toml +++ b/examples/text_embedding/pyproject.toml @@ -3,4 +3,4 @@ name = "text-embedding" version = "0.1.0" description = "Simple example for cocoindex: build embedding index based on local text files." requires-python = ">=3.10" -dependencies = ["cocoindex>=0.1.19", "python-dotenv>=1.0.1"] \ No newline at end of file +dependencies = ["cocoindex>=0.1.19", "python-dotenv>=1.0.1"] From d9c93d4d54cffa3d97dcc3aa05ddb66fa27ec208 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Mon, 14 Apr 2025 13:16:26 +0530 Subject: [PATCH 17/19] feat: Optionally delete points Signed-off-by: Anush008 --- examples/text_embedding_qdrant/pyproject.toml | 2 +- src/ops/storages/qdrant.rs | 20 ++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/examples/text_embedding_qdrant/pyproject.toml b/examples/text_embedding_qdrant/pyproject.toml index 8546886..1505d90 100644 --- a/examples/text_embedding_qdrant/pyproject.toml +++ b/examples/text_embedding_qdrant/pyproject.toml @@ -1,5 +1,5 @@ [project] -name = "text-embedding" +name = "text-embedding-qdrant" version = "0.1.0" description = "Simple example for cocoindex: build embedding index based on local text files." requires-python = ">=3.10" diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index 32791a9..c47e550 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -10,7 +10,8 @@ use anyhow::{bail, Result}; use futures::FutureExt; use qdrant_client::qdrant::vectors_output::VectorsOptions; use qdrant_client::qdrant::{ - NamedVectors, PointId, PointStruct, UpsertPointsBuilder, Value as QdrantValue, + DeletePointsBuilder, NamedVectors, PointId, PointStruct, PointsIdsList, UpsertPointsBuilder, + Value as QdrantValue, }; use qdrant_client::qdrant::{Query, QueryPointsBuilder, ScoredPoint}; use qdrant_client::Qdrant; @@ -77,6 +78,23 @@ impl ExportTargetExecutor for Executor { self.client .upsert_points(UpsertPointsBuilder::new(&self.collection_name, points).wait(true)) .await?; + + let ids = mutation + .delete_keys + .iter() + .map(key_to_point_id) + .collect::>>()?; + + if !ids.is_empty() { + self.client + .delete_points( + DeletePointsBuilder::new(&self.collection_name) + .points(PointsIdsList { ids }) + .wait(true), + ) + .await?; + } + Ok(()) } } From 32d94f74d0ccb43387c92d1d10e246221f792d85 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Mon, 14 Apr 2025 13:41:51 +0530 Subject: [PATCH 18/19] chore: parse BasicValueType::Date | BasicValueType::LocalDateTime | BasicValueType::OffsetDateTime | BasicValueType::Time | BasicValueType::Uuid Signed-off-by: Anush008 --- src/ops/storages/qdrant.rs | 75 ++++++++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 16 deletions(-) diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index c47e550..e2a77b9 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -15,7 +15,6 @@ use qdrant_client::qdrant::{ }; use qdrant_client::qdrant::{Query, QueryPointsBuilder, ScoredPoint}; use qdrant_client::Qdrant; -use serde::Serialize; use serde_json::json; #[derive(Debug, Deserialize, Clone)] @@ -129,11 +128,11 @@ fn values_to_payload( BasicValue::Float32(v) => (*v as f64).into(), BasicValue::Float64(v) => (*v).into(), BasicValue::Range(v) => json!({ "start": v.start, "end": v.end }), - BasicValue::Uuid(v) => v.to_string().into(), - BasicValue::Date(v) => v.to_string().into(), - BasicValue::LocalDateTime(v) => v.to_string().into(), - BasicValue::Time(v) => v.to_string().into(), - BasicValue::OffsetDateTime(v) => v.to_string().into(), + BasicValue::Uuid(v) => json!({ "uuid": v.to_string() }), + BasicValue::Date(v) => json!({ "date": v.to_string() }), + BasicValue::LocalDateTime(v) => json!({ "local_datetime": v.to_string() }), + BasicValue::Time(v) => json!({ "time": v.to_string() }), + BasicValue::OffsetDateTime(v) => json!({ "offset_datetime": v.to_string() }), BasicValue::Json(v) => (**v).clone(), BasicValue::Vector(v) => { let vector = convert_to_vector(v.to_vec()); @@ -224,13 +223,59 @@ fn into_value(point: &ScoredPoint, schema: &FieldSchema) -> Result { } }), - BasicValueType::Date - | BasicValueType::LocalDateTime - | BasicValueType::OffsetDateTime - | BasicValueType::Time - | BasicValueType::Uuid => point.payload.get(field_name).and_then(|v| { - v.as_str() - .map(|s| BasicValue::Str(Arc::from(s.to_string()))) + BasicValueType::Uuid => point.payload.get(field_name).and_then(|v| { + v.as_struct().and_then(|s| { + s.fields + .get("uuid")? + .as_str()? + .parse() + .ok() + .map(BasicValue::Uuid) + }) + }), + + BasicValueType::Date => point.payload.get(field_name).and_then(|v| { + v.as_struct().and_then(|s| { + s.fields + .get("date")? + .as_str()? + .parse() + .ok() + .map(BasicValue::Date) + }) + }), + + BasicValueType::Time => point.payload.get(field_name).and_then(|v| { + v.as_struct().and_then(|s| { + s.fields + .get("time")? + .as_str()? + .parse() + .ok() + .map(BasicValue::Time) + }) + }), + + BasicValueType::LocalDateTime => point.payload.get(field_name).and_then(|v| { + v.as_struct().and_then(|s| { + s.fields + .get("local_datetime")? + .as_str()? + .parse() + .ok() + .map(BasicValue::LocalDateTime) + }) + }), + + BasicValueType::OffsetDateTime => point.payload.get(field_name).and_then(|v| { + v.as_struct().and_then(|s| { + s.fields + .get("offset_datetime")? + .as_str()? + .parse() + .ok() + .map(BasicValue::OffsetDateTime) + }) }), BasicValueType::Range => point.payload.get(field_name).and_then(|v| { v.as_struct().and_then(|s| { @@ -368,9 +413,7 @@ impl StorageFactoryBase for Arc { _existing: setup::CombinedState<()>, _auth_registry: &Arc, ) -> Result { - Err(anyhow!( - "Set `setup_by_user` to `true` to use Qdrant storage" - )) as Result + Err(anyhow!("Set `setup_by_user` to `true` to export to Qdrant")) as Result } fn check_state_compatibility( From 19dee3e1180180efb71cafa4dc8ccc3ef23ad82b Mon Sep 17 00:00:00 2001 From: Anush008 Date: Mon, 14 Apr 2025 22:40:27 +0530 Subject: [PATCH 19/19] refactor: Don't nest complex types Signed-off-by: Anush008 --- src/ops/storages/qdrant.rs | 81 ++++++++++++-------------------------- 1 file changed, 26 insertions(+), 55 deletions(-) diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index e2a77b9..f3d047b 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -128,11 +128,11 @@ fn values_to_payload( BasicValue::Float32(v) => (*v as f64).into(), BasicValue::Float64(v) => (*v).into(), BasicValue::Range(v) => json!({ "start": v.start, "end": v.end }), - BasicValue::Uuid(v) => json!({ "uuid": v.to_string() }), - BasicValue::Date(v) => json!({ "date": v.to_string() }), - BasicValue::LocalDateTime(v) => json!({ "local_datetime": v.to_string() }), - BasicValue::Time(v) => json!({ "time": v.to_string() }), - BasicValue::OffsetDateTime(v) => json!({ "offset_datetime": v.to_string() }), + BasicValue::Uuid(v) => v.to_string().into(), + BasicValue::Date(v) => v.to_string().into(), + BasicValue::LocalDateTime(v) => v.to_string().into(), + BasicValue::Time(v) => v.to_string().into(), + BasicValue::OffsetDateTime(v) => v.to_string().into(), BasicValue::Json(v) => (**v).clone(), BasicValue::Vector(v) => { let vector = convert_to_vector(v.to_vec()); @@ -223,60 +223,31 @@ fn into_value(point: &ScoredPoint, schema: &FieldSchema) -> Result { } }), - BasicValueType::Uuid => point.payload.get(field_name).and_then(|v| { - v.as_struct().and_then(|s| { - s.fields - .get("uuid")? - .as_str()? - .parse() - .ok() - .map(BasicValue::Uuid) - }) - }), + BasicValueType::Uuid => point + .payload + .get(field_name) + .and_then(|v| v.as_str()?.parse().ok().map(BasicValue::Uuid)), - BasicValueType::Date => point.payload.get(field_name).and_then(|v| { - v.as_struct().and_then(|s| { - s.fields - .get("date")? - .as_str()? - .parse() - .ok() - .map(BasicValue::Date) - }) - }), + BasicValueType::Date => point + .payload + .get(field_name) + .and_then(|v| v.as_str()?.parse().ok().map(BasicValue::Date)), - BasicValueType::Time => point.payload.get(field_name).and_then(|v| { - v.as_struct().and_then(|s| { - s.fields - .get("time")? - .as_str()? - .parse() - .ok() - .map(BasicValue::Time) - }) - }), + BasicValueType::Time => point + .payload + .get(field_name) + .and_then(|v| v.as_str()?.parse().ok().map(BasicValue::Time)), - BasicValueType::LocalDateTime => point.payload.get(field_name).and_then(|v| { - v.as_struct().and_then(|s| { - s.fields - .get("local_datetime")? - .as_str()? - .parse() - .ok() - .map(BasicValue::LocalDateTime) - }) - }), + BasicValueType::LocalDateTime => point + .payload + .get(field_name) + .and_then(|v| v.as_str()?.parse().ok().map(BasicValue::LocalDateTime)), + + BasicValueType::OffsetDateTime => point + .payload + .get(field_name) + .and_then(|v| v.as_str()?.parse().ok().map(BasicValue::OffsetDateTime)), - BasicValueType::OffsetDateTime => point.payload.get(field_name).and_then(|v| { - v.as_struct().and_then(|s| { - s.fields - .get("offset_datetime")? - .as_str()? - .parse() - .ok() - .map(BasicValue::OffsetDateTime) - }) - }), BasicValueType::Range => point.payload.get(field_name).and_then(|v| { v.as_struct().and_then(|s| { let start = s.fields.get("start").and_then(|f| f.as_integer());