Skip to content

Commit 61aa6d0

Browse files
authored
Merge branch 'main' into qdrant
2 parents b0e4a4d + b09af0d commit 61aa6d0

31 files changed

+651
-343
lines changed

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ log = "0.4.26"
2525
regex = "1.11.1"
2626
serde = { version = "1.0.219", features = ["derive"] }
2727
serde_json = "1.0.140"
28-
sqlx = { version = "0.8.3", features = ["chrono", "postgres", "runtime-tokio"] }
28+
sqlx = { version = "0.8.3", features = ["chrono", "postgres", "runtime-tokio", "uuid"] }
2929
tokio = { version = "1.44.1", features = [
3030
"macros",
3131
"rt-multi-thread",
@@ -90,4 +90,4 @@ http-body-util = "0.1.3"
9090
yaml-rust2 = "0.10.0"
9191
urlencoding = "2.1.3"
9292
qdrant-client = "1.13.0"
93-
93+
uuid = { version = "1.16.0", features = ["serde", "v4", "v8"] }

docs/docs/core/data_types.mdx

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ This is the list of all basic types supported by CocoIndex:
2424
| float32 | `cocoindex.typing.Float32` |`float` |
2525
| float64 | `cocoindex.typing.Float64` |`float` |
2626
| range | `cocoindex.typing.Range` | `tuple[int, int]` |
27+
| uuid | `uuid.UUId` | `uuid.UUID` |
2728
| vector[*type*, *N*?] |`Annotated[list[type], cocoindex.typing.Vector(dim=N)]` | `list[type]` |
2829
| json | `cocoindex.typing.Json` | Any type convertible to JSON by `json` package |
2930

@@ -73,6 +74,7 @@ Currently, the following types are supported as types for key fields:
7374
- `bool`
7475
- `int64`
7576
- `range`
77+
- `uuid`
7678
- Struct with all fields being key types
7779

7880
### Vector Type

examples/pdf_embedding/main.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ def pdf_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoinde
5555

5656
with doc["chunks"].row() as chunk:
5757
chunk["embedding"] = chunk["text"].call(text_to_embedding)
58-
doc_embeddings.collect(filename=doc["filename"], location=chunk["location"],
58+
doc_embeddings.collect(id=cocoindex.GeneratedField.UUID,
59+
filename=doc["filename"], location=chunk["location"],
5960
text=chunk["text"], embedding=chunk["embedding"])
6061

6162
doc_embeddings.export(
6263
"doc_embeddings",
6364
cocoindex.storages.Qdrant(qdrant_url="http://localhost:6333", collection_name="cocoindex"),
64-
primary_key_fields=["filename", "location"],
65+
primary_key_fields=["id"],
6566
vector_index=[("embedding", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)])
6667

6768
query_handler = cocoindex.query.SimpleSemanticsQueryHandler(

python/cocoindex/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
Cocoindex is a framework for building and running indexing pipelines.
33
"""
44
from . import flow, functions, query, sources, storages, cli
5-
from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def, EvaluateAndDumpOptions
5+
from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def
6+
from .flow import EvaluateAndDumpOptions, GeneratedField
67
from .llm import LlmSpec, LlmApiType
78
from .vector import VectorSimilarityMetric
89
from .lib import *

python/cocoindex/convert.py

+6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44
import dataclasses
55
import inspect
6+
import uuid
67

78
from typing import Any, Callable
89
from .typing import analyze_type_info, COLLECTION_TYPES
@@ -13,6 +14,8 @@ def to_engine_value(value: Any) -> Any:
1314
return [to_engine_value(getattr(value, f.name)) for f in dataclasses.fields(value)]
1415
if isinstance(value, (list, tuple)):
1516
return [to_engine_value(v) for v in value]
17+
if isinstance(value, uuid.UUID):
18+
return value.bytes
1619
return value
1720

1821
def make_engine_value_converter(
@@ -62,6 +65,9 @@ def make_engine_value_converter(
6265
field_path.pop()
6366
return lambda value: [elem_converter(v) for v in value] if value is not None else None
6467

68+
if src_type_kind == 'Uuid':
69+
return lambda value: uuid.UUID(bytes=value)
70+
6571
return lambda value: value
6672

6773
def _make_engine_struct_value_converter(

python/cocoindex/flow.py

+21-2
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,12 @@ def add_collector(self, name: str | None = None) -> DataCollector:
238238
)
239239
)
240240

241+
class GeneratedField(Enum):
242+
"""
243+
A generated field is automatically set by the engine.
244+
"""
245+
UUID = "Uuid"
246+
241247
class DataCollector:
242248
"""A data collector is used to collect data into a collector."""
243249
_flow_builder_state: _FlowBuilderState
@@ -248,12 +254,25 @@ def __init__(self, flow_builder_state: _FlowBuilderState,
248254
self._flow_builder_state = flow_builder_state
249255
self._engine_data_collector = data_collector
250256

251-
def collect(self, **kwargs: DataSlice):
257+
def collect(self, **kwargs: DataSlice | GeneratedField):
252258
"""
253259
Collect data into the collector.
254260
"""
261+
regular_kwargs = []
262+
auto_uuid_field = None
263+
for k, v in kwargs.items():
264+
if isinstance(v, GeneratedField):
265+
if v == GeneratedField.UUID:
266+
if auto_uuid_field is not None:
267+
raise ValueError("Only one generated UUID field is allowed")
268+
auto_uuid_field = k
269+
else:
270+
raise ValueError(f"Unexpected generated field: {v}")
271+
else:
272+
regular_kwargs.append((k, _data_slice_state(v).engine_data_slice))
273+
255274
self._flow_builder_state.engine_flow_builder.collect(
256-
self._engine_data_collector, [(k, _data_slice_state(v).engine_data_slice) for k, v in kwargs.items()])
275+
self._engine_data_collector, regular_kwargs, auto_uuid_field)
257276

258277
def export(self, name: str, target_spec: op.StorageSpec, /, *,
259278
primary_key_fields: Sequence[str] | None = None,

python/cocoindex/typing.py

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import dataclasses
44
import types
55
import inspect
6+
import uuid
67
from typing import Annotated, NamedTuple, Any, TypeVar, TYPE_CHECKING, overload
78

89
class Vector(NamedTuple):
@@ -130,6 +131,8 @@ def analyze_type_info(t) -> AnalyzedTypeInfo:
130131
kind = 'Int64'
131132
elif t is float:
132133
kind = 'Float64'
134+
elif t is uuid.UUID:
135+
kind = 'Uuid'
133136
else:
134137
raise ValueError(f"type unsupported yet: {t}")
135138

src/base/json_schema.rs

+41-15
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,19 @@ use schemars::schema::{
33
ArrayValidation, InstanceType, Metadata, ObjectValidation, Schema, SchemaObject, SingleOrVec,
44
};
55

6+
pub struct ToJsonSchemaOptions {
7+
/// If true, mark all fields as required.
8+
/// Use union type (with `null`) for optional fields instead.
9+
/// Models like OpenAI will reject the schema if a field is not required.
10+
pub fields_always_required: bool,
11+
}
12+
613
pub trait ToJsonSchema {
7-
fn to_json_schema(&self) -> SchemaObject;
14+
fn to_json_schema(&self, options: &ToJsonSchemaOptions) -> SchemaObject;
815
}
916

1017
impl ToJsonSchema for schema::BasicValueType {
11-
fn to_json_schema(&self) -> SchemaObject {
18+
fn to_json_schema(&self, options: &ToJsonSchemaOptions) -> SchemaObject {
1219
let mut schema = SchemaObject::default();
1320
match self {
1421
schema::BasicValueType::Str => {
@@ -26,9 +33,6 @@ impl ToJsonSchema for schema::BasicValueType {
2633
schema::BasicValueType::Float32 | schema::BasicValueType::Float64 => {
2734
schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::Number)));
2835
}
29-
schema::BasicValueType::Json => {
30-
// Can be any value. No type constraint.
31-
}
3236
schema::BasicValueType::Range => {
3337
schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::Array)));
3438
schema.array = Some(Box::new(ArrayValidation {
@@ -51,11 +55,18 @@ impl ToJsonSchema for schema::BasicValueType {
5155
.description =
5256
Some("A range, start pos (inclusive), end pos (exclusive).".to_string());
5357
}
58+
schema::BasicValueType::Uuid => {
59+
schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::String)));
60+
schema.format = Some("uuid".to_string());
61+
}
62+
schema::BasicValueType::Json => {
63+
// Can be any value. No type constraint.
64+
}
5465
schema::BasicValueType::Vector(s) => {
5566
schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::Array)));
5667
schema.array = Some(Box::new(ArrayValidation {
5768
items: Some(SingleOrVec::Single(Box::new(
58-
s.element_type.to_json_schema().into(),
69+
s.element_type.to_json_schema(options).into(),
5970
))),
6071
min_items: s.dimension.and_then(|d| u32::try_from(d).ok()),
6172
max_items: s.dimension.and_then(|d| u32::try_from(d).ok()),
@@ -68,7 +79,7 @@ impl ToJsonSchema for schema::BasicValueType {
6879
}
6980

7081
impl ToJsonSchema for schema::StructSchema {
71-
fn to_json_schema(&self) -> SchemaObject {
82+
fn to_json_schema(&self, options: &ToJsonSchemaOptions) -> SchemaObject {
7283
SchemaObject {
7384
metadata: Some(Box::new(Metadata {
7485
description: self.description.as_ref().map(|s| s.to_string()),
@@ -79,12 +90,25 @@ impl ToJsonSchema for schema::StructSchema {
7990
properties: self
8091
.fields
8192
.iter()
82-
.map(|f| (f.name.to_string(), f.value_type.to_json_schema().into()))
93+
.map(|f| {
94+
let mut schema = f.value_type.to_json_schema(options);
95+
if options.fields_always_required && f.value_type.nullable {
96+
if let Some(instance_type) = &mut schema.instance_type {
97+
let mut types = match instance_type {
98+
SingleOrVec::Single(t) => vec![**t],
99+
SingleOrVec::Vec(t) => std::mem::take(t),
100+
};
101+
types.push(InstanceType::Null);
102+
*instance_type = SingleOrVec::Vec(types);
103+
}
104+
}
105+
(f.name.to_string(), schema.into())
106+
})
83107
.collect(),
84108
required: self
85109
.fields
86110
.iter()
87-
.filter(|&f| (!f.value_type.nullable))
111+
.filter(|&f| (options.fields_always_required || !f.value_type.nullable))
88112
.map(|f| f.name.to_string())
89113
.collect(),
90114
additional_properties: Some(Schema::Bool(false).into()),
@@ -96,14 +120,16 @@ impl ToJsonSchema for schema::StructSchema {
96120
}
97121

98122
impl ToJsonSchema for schema::ValueType {
99-
fn to_json_schema(&self) -> SchemaObject {
123+
fn to_json_schema(&self, options: &ToJsonSchemaOptions) -> SchemaObject {
100124
match self {
101-
schema::ValueType::Basic(b) => b.to_json_schema(),
102-
schema::ValueType::Struct(s) => s.to_json_schema(),
125+
schema::ValueType::Basic(b) => b.to_json_schema(options),
126+
schema::ValueType::Struct(s) => s.to_json_schema(options),
103127
schema::ValueType::Collection(c) => SchemaObject {
104128
instance_type: Some(SingleOrVec::Single(Box::new(InstanceType::Array))),
105129
array: Some(Box::new(ArrayValidation {
106-
items: Some(SingleOrVec::Single(Box::new(c.row.to_json_schema().into()))),
130+
items: Some(SingleOrVec::Single(Box::new(
131+
c.row.to_json_schema(options).into(),
132+
))),
107133
..Default::default()
108134
})),
109135
..Default::default()
@@ -113,7 +139,7 @@ impl ToJsonSchema for schema::ValueType {
113139
}
114140

115141
impl ToJsonSchema for schema::EnrichedValueType {
116-
fn to_json_schema(&self) -> SchemaObject {
117-
self.typ.to_json_schema()
142+
fn to_json_schema(&self, options: &ToJsonSchemaOptions) -> SchemaObject {
143+
self.typ.to_json_schema(options)
118144
}
119145
}

src/base/schema.rs

+59-3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ pub enum BasicValueType {
3535
/// A range, with a start offset and a length.
3636
Range,
3737

38+
/// A UUID.
39+
Uuid,
40+
3841
/// A JSON value.
3942
Json,
4043

@@ -52,6 +55,7 @@ impl std::fmt::Display for BasicValueType {
5255
BasicValueType::Float32 => write!(f, "float32"),
5356
BasicValueType::Float64 => write!(f, "float64"),
5457
BasicValueType::Range => write!(f, "range"),
58+
BasicValueType::Uuid => write!(f, "uuid"),
5559
BasicValueType::Json => write!(f, "json"),
5660
BasicValueType::Vector(s) => write!(
5761
f,
@@ -121,7 +125,7 @@ pub struct CollectionSchema {
121125
pub row: StructSchema,
122126

123127
#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
124-
pub collectors: Vec<NamedSpec<StructSchema>>,
128+
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
125129
}
126130

127131
impl CollectionSchema {
@@ -153,7 +157,7 @@ impl CollectionSchema {
153157
.iter()
154158
.map(|c| NamedSpec {
155159
name: c.name.clone(),
156-
spec: c.spec.without_attrs(),
160+
spec: Arc::from(c.spec.without_attrs()),
157161
})
158162
.collect(),
159163
}
@@ -335,13 +339,65 @@ impl std::fmt::Display for FieldSchema {
335339
}
336340
}
337341

342+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
343+
pub struct CollectorSchema {
344+
pub fields: Vec<FieldSchema>,
345+
/// If specified, the collector will have an automatically generated UUID field with the given index.
346+
pub auto_uuid_field_idx: Option<usize>,
347+
}
348+
349+
impl std::fmt::Display for CollectorSchema {
350+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351+
write!(f, "Collector(")?;
352+
for (i, field) in self.fields.iter().enumerate() {
353+
if i > 0 {
354+
write!(f, ", ")?;
355+
}
356+
write!(f, "{}", field)?;
357+
}
358+
write!(f, ")")
359+
}
360+
}
361+
362+
impl CollectorSchema {
363+
pub fn from_fields(fields: Vec<FieldSchema>, auto_uuid_field: Option<FieldName>) -> Self {
364+
let mut fields = fields;
365+
let auto_uuid_field_idx = if let Some(auto_uuid_field) = auto_uuid_field {
366+
fields.insert(
367+
0,
368+
FieldSchema::new(
369+
auto_uuid_field,
370+
EnrichedValueType {
371+
typ: ValueType::Basic(BasicValueType::Uuid),
372+
nullable: false,
373+
attrs: Default::default(),
374+
},
375+
),
376+
);
377+
Some(0)
378+
} else {
379+
None
380+
};
381+
Self {
382+
fields,
383+
auto_uuid_field_idx,
384+
}
385+
}
386+
pub fn without_attrs(&self) -> Self {
387+
Self {
388+
fields: self.fields.iter().map(|f| f.without_attrs()).collect(),
389+
auto_uuid_field_idx: self.auto_uuid_field_idx,
390+
}
391+
}
392+
}
393+
338394
/// Top-level schema for a flow instance.
339395
#[derive(Debug, Clone, Serialize, Deserialize)]
340396
pub struct DataSchema {
341397
pub schema: StructSchema,
342398

343399
#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
344-
pub collectors: Vec<NamedSpec<StructSchema>>,
400+
pub collectors: Vec<NamedSpec<Arc<CollectorSchema>>>,
345401
}
346402

347403
impl Deref for DataSchema {

src/base/spec.rs

+6
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,15 @@ pub struct ForEachOpSpec {
181181
/// Emit data to a given collector at the given scope.
182182
#[derive(Debug, Clone, Serialize, Deserialize)]
183183
pub struct CollectOpSpec {
184+
/// Field values to be collected.
184185
pub input: StructMapping,
186+
/// Scope for the collector.
185187
pub scope_name: ScopeName,
188+
/// Name of the collector.
186189
pub collector_name: FieldName,
190+
/// If specified, the collector will have an automatically generated UUID field with the given name.
191+
/// The uuid will remain stable when collected input values remain unchanged.
192+
pub auto_uuid_field: Option<FieldName>,
187193
}
188194

189195
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]

0 commit comments

Comments
 (0)