Skip to content

Commit b09af0d

Browse files
authored
Revise setup logic: simplification and correctly handle target_id (#214)
1 parent 88e0fe3 commit b09af0d

File tree

5 files changed

+64
-62
lines changed

5 files changed

+64
-62
lines changed

src/builder/analyzer.rs

+25-29
Original file line numberDiff line numberDiff line change
@@ -885,10 +885,8 @@ impl AnalyzerContext<'_> {
885885
}
886886
};
887887

888-
let target_id: i32 = 1; // TODO: Fill it with a meaningful value automatically
889888
let ((setup_key, desired_state), executor_fut) = export_factory.clone().build(
890889
export_op.name.clone(),
891-
target_id,
892890
spec,
893891
key_fields_schema,
894892
value_fields_schema,
@@ -902,44 +900,42 @@ impl AnalyzerContext<'_> {
902900
let existing_target_states = existing_target_states.get(&resource_id);
903901
let target_id = setup_state
904902
.map(|setup_state| -> Result<i32> {
905-
let existing_target_ids = existing_target_states
906-
.iter()
907-
.flat_map(|v| v.iter())
908-
.map(|state| state.common.target_id)
909-
.collect::<HashSet<_>>();
910-
let target_id = if existing_target_ids.len() == 1 {
911-
existing_target_ids.into_iter().next().unwrap()
903+
let mut compatible_target_ids = HashSet::<Option<i32>>::new();
904+
let mut reusable_schema_version_ids = HashSet::<Option<i32>>::new();
905+
for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) {
906+
let compatibility = export_factory
907+
.check_state_compatibility(&desired_state, &existing_state.state)?;
908+
let compatible_target_id =
909+
if compatibility != SetupStateCompatibility::NotCompatible {
910+
reusable_schema_version_ids.insert(
911+
(compatibility == SetupStateCompatibility::Compatible)
912+
.then_some(existing_state.common.schema_version_id),
913+
);
914+
Some(existing_state.common.target_id)
915+
} else {
916+
None
917+
};
918+
compatible_target_ids.insert(compatible_target_id);
919+
}
920+
921+
let target_id = if compatible_target_ids.len() == 1 {
922+
compatible_target_ids.into_iter().next().flatten()
912923
} else {
913-
if existing_target_ids.len() > 1 {
924+
if compatible_target_ids.len() > 1 {
914925
warn!("Multiple target states with the same key schema found");
915926
}
927+
None
928+
};
929+
let target_id = target_id.unwrap_or_else(|| {
916930
setup_state.metadata.last_target_id += 1;
917931
setup_state.metadata.last_target_id
918-
};
932+
});
919933
let max_schema_version_id = existing_target_states
920934
.iter()
921935
.flat_map(|v| v.iter())
922936
.map(|s| s.common.max_schema_version_id)
923937
.max()
924938
.unwrap_or(0);
925-
let reusable_schema_version_ids = existing_target_states
926-
.iter()
927-
.flat_map(|v| v.iter())
928-
.map(|s| {
929-
Ok({
930-
if export_factory.will_keep_all_existing_data(
931-
&export_op.name,
932-
target_id,
933-
&desired_state,
934-
&s.state,
935-
)? {
936-
Some(s.common.schema_version_id)
937-
} else {
938-
None
939-
}
940-
})
941-
})
942-
.collect::<Result<HashSet<_>>>()?;
943939
let schema_version_id = if reusable_schema_version_ids.len() == 1 {
944940
reusable_schema_version_ids
945941
.into_iter()

src/builder/flow_builder.rs

-1
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,6 @@ impl FlowBuilder {
523523
) -> PyResult<()> {
524524
let common_scope = Self::minimum_common_scope(fields.iter().map(|(_, ds)| &ds.scope), None)
525525
.into_py_result()?;
526-
let has_auto_uuid_field = auto_uuid_field.is_some();
527526
let name = format!(".collect.{}", self.next_generated_op_id);
528527
self.next_generated_op_id += 1;
529528
self.do_in_scope(

src/ops/factory_bases.rs

+5-14
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,6 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
281281
fn build(
282282
self: Arc<Self>,
283283
name: String,
284-
target_id: i32,
285284
spec: Self::Spec,
286285
key_fields_schema: Vec<FieldSchema>,
287286
value_fields_schema: Vec<FieldSchema>,
@@ -301,13 +300,11 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
301300
impl setup::ResourceSetupStatusCheck<Key = Self::Key, State = Self::SetupState> + 'static,
302301
>;
303302

304-
fn will_keep_all_existing_data(
303+
fn check_state_compatibility(
305304
&self,
306-
name: &str,
307-
target_id: i32,
308305
desired_state: &Self::SetupState,
309306
existing_state: &Self::SetupState,
310-
) -> Result<bool>;
307+
) -> Result<SetupStateCompatibility>;
311308

312309
fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()>
313310
where
@@ -384,7 +381,6 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
384381
fn build(
385382
self: Arc<Self>,
386383
name: String,
387-
target_id: i32,
388384
spec: serde_json::Value,
389385
key_fields_schema: Vec<FieldSchema>,
390386
value_fields_schema: Vec<FieldSchema>,
@@ -398,7 +394,6 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
398394
let ((setup_key, setup_state), executors) = StorageFactoryBase::build(
399395
self,
400396
name,
401-
target_id,
402397
spec,
403398
key_fields_schema,
404399
value_fields_schema,
@@ -438,17 +433,13 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
438433
)?))
439434
}
440435

441-
fn will_keep_all_existing_data(
436+
fn check_state_compatibility(
442437
&self,
443-
name: &str,
444-
target_id: i32,
445438
desired_state: &serde_json::Value,
446439
existing_state: &serde_json::Value,
447-
) -> Result<bool> {
448-
let result = StorageFactoryBase::will_keep_all_existing_data(
440+
) -> Result<SetupStateCompatibility> {
441+
let result = StorageFactoryBase::check_state_compatibility(
449442
self,
450-
name,
451-
target_id,
452443
&serde_json::from_value(desired_state.clone())?,
453444
&serde_json::from_value(existing_state.clone())?,
454445
)?;

src/ops/interface.rs

+14-5
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,24 @@ pub trait ExportTargetExecutor: Send + Sync {
8686
async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()>;
8787
}
8888

89+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90+
pub enum SetupStateCompatibility {
91+
/// The resource is fully compatible with the desired state.
92+
/// This means the resource can be updated to the desired state without any loss of data.
93+
Compatible,
94+
/// The resource is partially compatible with the desired state.
95+
/// This means some existing data will be lost after applying the setup change.
96+
PartialCompatible,
97+
/// The resource needs to be rebuilt
98+
NotCompatible,
99+
}
100+
89101
pub trait ExportTargetFactory {
90102
// The first field of the `input_schema` is the primary key field.
91103
// If it has struct type, it should be converted to composite primary key.
92104
fn build(
93105
self: Arc<Self>,
94106
name: String,
95-
target_id: i32,
96107
spec: serde_json::Value,
97108
key_fields_schema: Vec<FieldSchema>,
98109
value_fields_schema: Vec<FieldSchema>,
@@ -116,13 +127,11 @@ pub trait ExportTargetFactory {
116127
>,
117128
>;
118129

119-
fn will_keep_all_existing_data(
130+
fn check_state_compatibility(
120131
&self,
121-
name: &str,
122-
target_id: i32,
123132
desired_state: &serde_json::Value,
124133
existing_state: &serde_json::Value,
125-
) -> Result<bool>;
134+
) -> Result<SetupStateCompatibility>;
126135
}
127136

128137
#[derive(Clone)]

src/ops/storages/postgres.rs

+20-13
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,6 @@ impl StorageFactoryBase for Arc<Factory> {
894894
fn build(
895895
self: Arc<Self>,
896896
name: String,
897-
target_id: i32,
898897
spec: Spec,
899898
key_fields_schema: Vec<FieldSchema>,
900899
value_fields_schema: Vec<FieldSchema>,
@@ -906,9 +905,9 @@ impl StorageFactoryBase for Arc<Factory> {
906905
)> {
907906
let table_id = TableId {
908907
database_url: spec.database_url.clone(),
909-
table_name: spec.table_name.unwrap_or_else(|| {
910-
format!("{}__{}__{}", context.flow_instance_name, name, target_id)
911-
}),
908+
table_name: spec
909+
.table_name
910+
.unwrap_or_else(|| format!("{}__{}", context.flow_instance_name, name)),
912911
};
913912
let setup_state = SetupState::new(
914913
&table_id,
@@ -943,22 +942,30 @@ impl StorageFactoryBase for Arc<Factory> {
943942
Ok(SetupStatusCheck::new(self.clone(), key, desired, existing))
944943
}
945944

946-
fn will_keep_all_existing_data(
945+
fn check_state_compatibility(
947946
&self,
948-
_name: &str,
949-
_target_id: i32,
950947
desired: &SetupState,
951948
existing: &SetupState,
952-
) -> Result<bool> {
953-
let result = existing
954-
.key_fields_schema
955-
.iter()
956-
.all(|(k, v)| desired.key_fields_schema.get(k) == Some(v))
949+
) -> Result<SetupStateCompatibility> {
950+
let is_key_identical = existing.key_fields_schema.len() == desired.key_fields_schema.len()
957951
&& existing
952+
.key_fields_schema
953+
.iter()
954+
.all(|(k, v)| desired.key_fields_schema.get(k) == Some(v));
955+
let compatibility = if is_key_identical {
956+
let is_value_lossy = existing
958957
.value_fields_schema
959958
.iter()
960959
.any(|(k, v)| desired.value_fields_schema.get(k) != Some(v));
961-
Ok(result)
960+
if is_value_lossy {
961+
SetupStateCompatibility::PartialCompatible
962+
} else {
963+
SetupStateCompatibility::Compatible
964+
}
965+
} else {
966+
SetupStateCompatibility::NotCompatible
967+
};
968+
Ok(compatibility)
962969
}
963970
}
964971

0 commit comments

Comments
 (0)