Skip to content

Revise setup logic: simplification and correctly handle target_id #214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 25 additions & 29 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,10 +885,8 @@ impl AnalyzerContext<'_> {
}
};

let target_id: i32 = 1; // TODO: Fill it with a meaningful value automatically
let ((setup_key, desired_state), executor_fut) = export_factory.clone().build(
export_op.name.clone(),
target_id,
spec,
key_fields_schema,
value_fields_schema,
Expand All @@ -902,44 +900,42 @@ impl AnalyzerContext<'_> {
let existing_target_states = existing_target_states.get(&resource_id);
let target_id = setup_state
.map(|setup_state| -> Result<i32> {
let existing_target_ids = existing_target_states
.iter()
.flat_map(|v| v.iter())
.map(|state| state.common.target_id)
.collect::<HashSet<_>>();
let target_id = if existing_target_ids.len() == 1 {
existing_target_ids.into_iter().next().unwrap()
let mut compatible_target_ids = HashSet::<Option<i32>>::new();
let mut reusable_schema_version_ids = HashSet::<Option<i32>>::new();
for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) {
let compatibility = export_factory
.check_state_compatibility(&desired_state, &existing_state.state)?;
let compatible_target_id =
if compatibility != SetupStateCompatibility::NotCompatible {
reusable_schema_version_ids.insert(
(compatibility == SetupStateCompatibility::Compatible)
.then_some(existing_state.common.schema_version_id),
);
Some(existing_state.common.target_id)
} else {
None
};
compatible_target_ids.insert(compatible_target_id);
}

let target_id = if compatible_target_ids.len() == 1 {
compatible_target_ids.into_iter().next().flatten()
} else {
if existing_target_ids.len() > 1 {
if compatible_target_ids.len() > 1 {
warn!("Multiple target states with the same key schema found");
}
None
};
let target_id = target_id.unwrap_or_else(|| {
setup_state.metadata.last_target_id += 1;
setup_state.metadata.last_target_id
};
});
let max_schema_version_id = existing_target_states
.iter()
.flat_map(|v| v.iter())
.map(|s| s.common.max_schema_version_id)
.max()
.unwrap_or(0);
let reusable_schema_version_ids = existing_target_states
.iter()
.flat_map(|v| v.iter())
.map(|s| {
Ok({
if export_factory.will_keep_all_existing_data(
&export_op.name,
target_id,
&desired_state,
&s.state,
)? {
Some(s.common.schema_version_id)
} else {
None
}
})
})
.collect::<Result<HashSet<_>>>()?;
let schema_version_id = if reusable_schema_version_ids.len() == 1 {
reusable_schema_version_ids
.into_iter()
Expand Down
1 change: 0 additions & 1 deletion src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,6 @@ impl FlowBuilder {
) -> PyResult<()> {
let common_scope = Self::minimum_common_scope(fields.iter().map(|(_, ds)| &ds.scope), None)
.into_py_result()?;
let has_auto_uuid_field = auto_uuid_field.is_some();
let name = format!(".collect.{}", self.next_generated_op_id);
self.next_generated_op_id += 1;
self.do_in_scope(
Expand Down
19 changes: 5 additions & 14 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
fn build(
self: Arc<Self>,
name: String,
target_id: i32,
spec: Self::Spec,
key_fields_schema: Vec<FieldSchema>,
value_fields_schema: Vec<FieldSchema>,
Expand All @@ -301,13 +300,11 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
impl setup::ResourceSetupStatusCheck<Key = Self::Key, State = Self::SetupState> + 'static,
>;

fn will_keep_all_existing_data(
fn check_state_compatibility(
&self,
name: &str,
target_id: i32,
desired_state: &Self::SetupState,
existing_state: &Self::SetupState,
) -> Result<bool>;
) -> Result<SetupStateCompatibility>;

fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
where
Expand Down Expand Up @@ -384,7 +381,6 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
fn build(
self: Arc<Self>,
name: String,
target_id: i32,
spec: serde_json::Value,
key_fields_schema: Vec<FieldSchema>,
value_fields_schema: Vec<FieldSchema>,
Expand All @@ -398,7 +394,6 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
let ((setup_key, setup_state), executors) = StorageFactoryBase::build(
self,
name,
target_id,
spec,
key_fields_schema,
value_fields_schema,
Expand Down Expand Up @@ -438,17 +433,13 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
)?))
}

fn will_keep_all_existing_data(
fn check_state_compatibility(
&self,
name: &str,
target_id: i32,
desired_state: &serde_json::Value,
existing_state: &serde_json::Value,
) -> Result<bool> {
let result = StorageFactoryBase::will_keep_all_existing_data(
) -> Result<SetupStateCompatibility> {
let result = StorageFactoryBase::check_state_compatibility(
self,
name,
target_id,
&serde_json::from_value(desired_state.clone())?,
&serde_json::from_value(existing_state.clone())?,
)?;
Expand Down
19 changes: 14 additions & 5 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,24 @@ pub trait ExportTargetExecutor: Send + Sync {
async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()>;
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SetupStateCompatibility {
/// The resource is fully compatible with the desired state.
/// This means the resource can be updated to the desired state without any loss of data.
Compatible,
/// The resource is partially compatible with the desired state.
/// This means some existing data will be lost after applying the setup change.
PartialCompatible,
/// The resource needs to be rebuilt
NotCompatible,
}

pub trait ExportTargetFactory {
// The first field of the `input_schema` is the primary key field.
// If it has struct type, it should be converted to composite primary key.
fn build(
self: Arc<Self>,
name: String,
target_id: i32,
spec: serde_json::Value,
key_fields_schema: Vec<FieldSchema>,
value_fields_schema: Vec<FieldSchema>,
Expand All @@ -116,13 +127,11 @@ pub trait ExportTargetFactory {
>,
>;

fn will_keep_all_existing_data(
fn check_state_compatibility(
&self,
name: &str,
target_id: i32,
desired_state: &serde_json::Value,
existing_state: &serde_json::Value,
) -> Result<bool>;
) -> Result<SetupStateCompatibility>;
}

#[derive(Clone)]
Expand Down
33 changes: 20 additions & 13 deletions src/ops/storages/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,6 @@ impl StorageFactoryBase for Arc<Factory> {
fn build(
self: Arc<Self>,
name: String,
target_id: i32,
spec: Spec,
key_fields_schema: Vec<FieldSchema>,
value_fields_schema: Vec<FieldSchema>,
Expand All @@ -906,9 +905,9 @@ impl StorageFactoryBase for Arc<Factory> {
)> {
let table_id = TableId {
database_url: spec.database_url.clone(),
table_name: spec.table_name.unwrap_or_else(|| {
format!("{}__{}__{}", context.flow_instance_name, name, target_id)
}),
table_name: spec
.table_name
.unwrap_or_else(|| format!("{}__{}", context.flow_instance_name, name)),
};
let setup_state = SetupState::new(
&table_id,
Expand Down Expand Up @@ -943,22 +942,30 @@ impl StorageFactoryBase for Arc<Factory> {
Ok(SetupStatusCheck::new(self.clone(), key, desired, existing))
}

fn will_keep_all_existing_data(
fn check_state_compatibility(
&self,
_name: &str,
_target_id: i32,
desired: &SetupState,
existing: &SetupState,
) -> Result<bool> {
let result = existing
.key_fields_schema
.iter()
.all(|(k, v)| desired.key_fields_schema.get(k) == Some(v))
) -> Result<SetupStateCompatibility> {
let is_key_identical = existing.key_fields_schema.len() == desired.key_fields_schema.len()
&& existing
.key_fields_schema
.iter()
.all(|(k, v)| desired.key_fields_schema.get(k) == Some(v));
let compatibility = if is_key_identical {
let is_value_lossy = existing
.value_fields_schema
.iter()
.any(|(k, v)| desired.value_fields_schema.get(k) != Some(v));
Ok(result)
if is_value_lossy {
SetupStateCompatibility::PartialCompatible
} else {
SetupStateCompatibility::Compatible
}
} else {
SetupStateCompatibility::NotCompatible
};
Ok(compatibility)
}
}

Expand Down