Skip to content

Incremental (delta) update #928

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 75 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
fa09d0b
adding incremental update
ilongin Feb 19, 2025
99a5327
continued working on incremental
Feb 19, 2025
f01b3a2
finixhed first test
ilongin Feb 19, 2025
8fa1534
added from storage incremental update test
Feb 21, 2025
67824e6
refactoring
ilongin Feb 21, 2025
c11d797
merging with main
ilongin Feb 24, 2025
ee6640d
using delta instead of incremental
ilongin Feb 24, 2025
5e446b5
added check for modification
ilongin Feb 24, 2025
71c3469
added another test
ilongin Feb 25, 2025
83366aa
refactoring
ilongin Feb 25, 2025
a22916c
added comment
ilongin Feb 25, 2025
d9e4f26
split tests in new file
ilongin Feb 25, 2025
c622ba4
Merge branch 'main' into ilongin/798-incremental-update
ilongin Feb 25, 2025
58c27f0
updated docs
ilongin Feb 25, 2025
ad5ee5a
Merge branch 'main' into ilongin/798-incremental-update
ilongin Feb 27, 2025
079ba7a
Merge branch 'main' into ilongin/798-incremental-update
ilongin Mar 3, 2025
de026e3
Merge branch 'main' into ilongin/798-incremental-update
ilongin Mar 4, 2025
d1d066e
merged with main
ilongin Mar 5, 2025
046731b
added sys columns explicitly
ilongin Mar 5, 2025
9f52c8b
fixing delta to not have old versions in end result
ilongin Mar 5, 2025
802a934
added append steps
ilongin Mar 6, 2025
f3a7b12
fixing logic
ilongin Mar 6, 2025
3e7fa37
Merge branch 'main' into ilongin/798-incremental-update
ilongin Mar 6, 2025
07968ac
Merge branch 'main' into ilongin/798-incremental-update
ilongin Mar 10, 2025
d504461
Merge branch 'main' into ilongin/798-incremental-update
ilongin Mar 11, 2025
d7b8623
removed append steps from DataChain
ilongin Mar 11, 2025
0464c16
added better docs
ilongin Mar 11, 2025
8093000
removed sys flag
ilongin Mar 12, 2025
b505df7
Merge branch 'main' into ilongin/798-incremental-update
ilongin Mar 13, 2025
0dd71a2
fixing typo
ilongin Mar 14, 2025
932f64b
Merge branch 'main' into ilongin/798-incremental-update
ilongin Mar 14, 2025
b2430c4
Merge branch 'main' into ilongin/798-incremental-update
ilongin Mar 18, 2025
c02d689
Merge branch 'main' into ilongin/798-incremental-update
ilongin Mar 19, 2025
638e737
merging with main
ilongin Mar 24, 2025
2b29498
added alternative delta approach
ilongin Mar 24, 2025
626ba5a
Merge branch 'main' into ilongin/798-incremental-update
ilongin Mar 25, 2025
e1c0325
merging with main
ilongin Mar 27, 2025
b0958a8
Merge branch 'main' into ilongin/798-incremental-update
ilongin Mar 28, 2025
400a991
Merge branch 'main' into ilongin/798-incremental-update
ilongin Apr 2, 2025
59d1713
Merge branch 'main' into ilongin/798-incremental-update
ilongin Apr 5, 2025
ce35a6e
Merge branch 'main' into ilongin/798-incremental-update
ilongin Apr 14, 2025
e085280
fixing delta due to lazy listing changes
ilongin Apr 15, 2025
735af02
fixing datasetdependencies
ilongin Apr 15, 2025
f3ebf97
returning function
ilongin Apr 15, 2025
59b7666
renaming method
ilongin Apr 15, 2025
055ce91
Merge branch 'main' into ilongin/798-incremental-update
ilongin Apr 16, 2025
eaa8dc6
merging with main
ilongin Apr 22, 2025
7d0a283
leaving only alternative implementation
ilongin Apr 22, 2025
95a206b
fixing tests
ilongin Apr 22, 2025
de72329
fixing tests
ilongin Apr 23, 2025
55269ab
fixing tests
ilongin Apr 23, 2025
b7b16ba
updating docs
ilongin Apr 25, 2025
723a1a6
not creating dataset if diff is empty
ilongin Apr 25, 2025
9135b2d
Merge branch 'main' into ilongin/798-incremental-update
ilongin Apr 28, 2025
c670e33
adding diff persist to avoid re-calculation of diff and removing obso…
ilongin Apr 28, 2025
773b22d
adding count after persist to avoid re-calculating diff twice
ilongin Apr 28, 2025
e8de5f2
moving ad_delta to private and fixing delta docs
ilongin Apr 28, 2025
e8d6f2d
removing not reachable codebase
ilongin Apr 28, 2025
803345a
fixing lint issue
ilongin Apr 28, 2025
08a4c1b
added test to check num of processing calls
ilongin Apr 28, 2025
b0470ce
adding schema to diff instead of appending
ilongin Apr 28, 2025
5470104
Merge branch 'main' into ilongin/798-incremental-update
ilongin Apr 30, 2025
114e80a
Merge branch 'main' into ilongin/798-incremental-update
ilongin May 6, 2025
2ab1759
moving delta_disabled to delta.py
ilongin May 6, 2025
594ef7d
moved is_empty to property empty
ilongin May 6, 2025
567d63f
adding custom fields to calculate diff in delta update
ilongin May 8, 2025
5ca0689
merge with main
ilongin May 9, 2025
3debff1
Merge branch 'main' into ilongin/798-incremental-update
ilongin May 9, 2025
10b95cd
Merge branch 'main' into ilongin/798-incremental-update
ilongin May 12, 2025
e1f60c7
fixing semver
ilongin May 12, 2025
be704a2
renamed field
ilongin May 12, 2025
5decfeb
fixing dataset dependencies in delta update
ilongin May 13, 2025
ab9f9a3
fixing small issues with deleted
ilongin May 14, 2025
e2f5bf3
Merge branch 'main' into ilongin/798-incremental-update
ilongin May 14, 2025
ee27458
Merge branch 'main' into ilongin/798-incremental-update
ilongin May 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions src/datachain/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,21 @@ def parse(
dataset_version: Optional[str],
dataset_version_created_at: Optional[datetime],
) -> Optional["DatasetDependency"]:
from datachain.client import Client
from datachain.lib.listing import is_listing_dataset, listing_uri_from_name
from datachain.lib.listing import is_listing_dataset

if not dataset_id:
return None

assert dataset_name is not None
dependency_type = DatasetDependencyType.DATASET
dependency_name = dataset_name

if is_listing_dataset(dataset_name):
dependency_type = DatasetDependencyType.STORAGE # type: ignore[arg-type]
dependency_name, _ = Client.parse_url(listing_uri_from_name(dataset_name))

return cls(
id,
dependency_type,
dependency_name,
(
DatasetDependencyType.STORAGE
if is_listing_dataset(dataset_name)
else DatasetDependencyType.DATASET
),
dataset_name,
(
dataset_version # type: ignore[arg-type]
if dataset_version
Expand Down
119 changes: 119 additions & 0 deletions src/datachain/delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from collections.abc import Sequence
from copy import copy
from functools import wraps
from typing import TYPE_CHECKING, Callable, Optional, TypeVar, Union

import datachain
from datachain.dataset import DatasetDependency
from datachain.error import DatasetNotFoundError

if TYPE_CHECKING:
from typing_extensions import Concatenate, ParamSpec

from datachain.lib.dc import DataChain

P = ParamSpec("P")


T = TypeVar("T", bound="DataChain")


def delta_disabled(
method: "Callable[Concatenate[T, P], T]",
) -> "Callable[Concatenate[T, P], T]":
"""
Decorator for disabling DataChain methods (e.g `.agg()` or `.union()`) to
work with delta updates. It throws `NotImplementedError` if chain on which
method is called is marked as delta.
"""

@wraps(method)
def _inner(self: T, *args: "P.args", **kwargs: "P.kwargs") -> T:
if self.delta:
raise NotImplementedError(
f"Delta update cannot be used with {method.__name__}"
)
return method(self, *args, **kwargs)

return _inner


def _append_steps(dc: "DataChain", other: "DataChain"):
"""Returns cloned chain with appended steps from other chain.
Steps are all those modification methods applied like filters, mappers etc.
"""
dc = dc.clone()
dc._query.steps += other._query.steps.copy()
dc.signals_schema = other.signals_schema
return dc


def delta_update(
dc: "DataChain",
name: str,
on: Union[str, Sequence[str]],
right_on: Optional[Union[str, Sequence[str]]] = None,
compare: Optional[Union[str, Sequence[str]]] = None,
) -> tuple[Optional["DataChain"], Optional[list[DatasetDependency]], bool]:
"""
Creates new chain that consists of the last version of current delta dataset
plus diff from the source with all needed modifications.
This way we don't need to re-calculate the whole chain from the source again(
apply all the DataChain methods like filters, mappers, generators etc.)
but just the diff part which is very important for performance.

Note that currently delta update works only if there is only one direct dependency.
"""
catalog = dc.session.catalog
dc._query.apply_listing_pre_step()

try:
latest_version = catalog.get_dataset(name).latest_version
except DatasetNotFoundError:
# first creation of delta update dataset
return None, None, True

dependencies = catalog.get_dataset_dependencies(
name, latest_version, indirect=False
)

dep = dependencies[0]
if not dep:
# starting dataset (e.g listing) was removed so we are backing off to normal
# dataset creation, as it was created first time
return None, None, True

Check warning on line 84 in src/datachain/delta.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/delta.py#L84

Added line #L84 was not covered by tests

source_ds_name = dep.name
source_ds_version = dep.version
source_ds_latest_version = catalog.get_dataset(source_ds_name).latest_version
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this logic work with studio flags ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I assumed all datasets are present locally and I think this will always be the case since the only case where one of the dependencies is not present locally, but in Studio, is when we pull dataset from it (we don't pull it's dependencies). But when using delta update, user is creating / saving dataset so this is not about pulling existing one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, will I be able to use something like from_dataset(studio=True, delta=True)? (and it automatically pulls the versions of the dataset that I need .. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in this example:

dc.from_dataset("my-studio-ds", studio=True, delta=True).filter(...).save("my-delta-ds")

This will happen:

  1. On the first run "my-studio-ds" will be pulled and first version of "my-delta-ds" will be created normally (without any delta logic as it's first time creating)
  2. On the second run (and all other runs) we will have already "my-local-ds" locally and it will do diff between it's latest version and version from "my-delta-ds" dependencies. If new version is being created in Studio it won't have any effects as we have dataset locally (and maybe we also have some new versions of it as well) and we look into that - this is not related to delta update itself, but that's how dataset pulling and .from_dataset(studio=True) works generally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if someone else is running this code and there are already a few versions of the dataset on Studio?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be the same, the last version of my-studio-ds would be fetched locally and from that on, it would use that local dataset.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, should we actually check / pull two versions from Studio in this case (initial run) - to be able to get the diff?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, this one is also not resolved yet ... can be a followup, but it seems like an issue for me

dependencies = copy(dependencies)
dependencies = [d for d in dependencies if d is not None] # filter out removed dep
dependencies[0].version = source_ds_latest_version # type: ignore[union-attr]

source_dc = datachain.read_dataset(source_ds_name, source_ds_version)
source_dc_latest = datachain.read_dataset(source_ds_name, source_ds_latest_version)

diff = source_dc_latest.compare(source_dc, on=on, compare=compare, deleted=False)
# We append all the steps from the original chain to diff, e.g filters, mappers.
diff = _append_steps(diff, dc)

# to avoid re-calculating diff multiple times
diff = diff.persist()

if diff.empty:
return None, None, False

# merging diff and the latest version of dataset
delta_chain = (
datachain.read_dataset(name, latest_version)
.compare(
diff,
on=right_on or on,
added=True,
modified=False,
deleted=False,
)
.union(diff)
)

return delta_chain, dependencies, True # type: ignore[return-value]
14 changes: 10 additions & 4 deletions src/datachain/diff/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
SAME = "S"


def _compare( # noqa: C901
def _compare( # noqa: C901, PLR0912
left: "DataChain",
right: "DataChain",
on: Union[str, Sequence[str]],
Expand Down Expand Up @@ -77,14 +77,16 @@
cols_select = list(left.signals_schema.clone_without_sys_signals().values.keys())

# getting correct on and right_on column names
on_ = on
on = left.signals_schema.resolve(*on).db_signals() # type: ignore[assignment]
right_on = right.signals_schema.resolve(*(right_on or on)).db_signals() # type: ignore[assignment]
right_on = right.signals_schema.resolve(*(right_on or on_)).db_signals() # type: ignore[assignment]

# getting correct compare and right_compare column names if they are defined
if compare:
compare_ = compare
compare = left.signals_schema.resolve(*compare).db_signals() # type: ignore[assignment]
right_compare = right.signals_schema.resolve(
*(right_compare or compare)
*(right_compare or compare_)
).db_signals() # type: ignore[assignment]
elif not compare and len(cols) != len(right_cols):
# here we will mark all rows that are not added or deleted as modified since
Expand Down Expand Up @@ -155,7 +157,11 @@
if status_col:
cols_select.append(diff_col)

dc_diff = dc_diff.select(*cols_select)
if not dc_diff._sys:
# TODO workaround when sys signal is not available in diff
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change I had some issues in diff codebase (stopped working for some cases). I've tried to investigated but it took longer time than expected so for temporary fix I did a workaround and wanted to create an issue for it to understand real cause. Looks like I just put TODO and forgot to create an issue - will do it now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it fixed? can we cleanup this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I will create separate issue to deal with this later on. I don't think it's high priority as this workaround works fine for now. It's tricky to fix though so that's why I don't want to spent too much time on it now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when does it happen? if we have an issue with context - let's put a link in this todo please ... it should be clear as much as possible from the context what is happening here (and why)

what is sys=True - it's not documented it seems (why?)

dc_diff = dc_diff.settings(sys=True).select(*cols_select).settings(sys=False)
else:
dc_diff = dc_diff.select(*cols_select)

Check warning on line 164 in src/datachain/diff/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/diff/__init__.py#L164

Added line #L164 was not covered by tests

# final schema is schema from the left chain with status column added if needed
dc_diff.signals_schema = (
Expand Down
77 changes: 75 additions & 2 deletions src/datachain/lib/dc/datachain.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from datachain import semver
from datachain.dataset import DatasetRecord
from datachain.delta import delta_disabled, delta_update
from datachain.func import literal
from datachain.func.base import Function
from datachain.func.func import Func
Expand Down Expand Up @@ -72,6 +73,9 @@
P = ParamSpec("P")


T = TypeVar("T", bound="DataChain")


class DataChain:
"""DataChain - a data structure for batch data processing and evaluation.

Expand Down Expand Up @@ -164,6 +168,7 @@
self.signals_schema = signal_schema
self._setup: dict = setup or {}
self._sys = _sys
self._delta = False

def __repr__(self) -> str:
"""Return a string representation of the chain."""
Expand All @@ -177,6 +182,32 @@
self.print_schema(file=file)
return file.getvalue()

def _as_delta(
self,
on: Optional[Union[str, Sequence[str]]] = None,
right_on: Optional[Union[str, Sequence[str]]] = None,
compare: Optional[Union[str, Sequence[str]]] = None,
) -> "Self":
"""Marks this chain as delta, which means special delta process will be
called on saving dataset for optimization"""
if on is None:
raise ValueError("'delta on' fields must be defined")

Check warning on line 194 in src/datachain/lib/dc/datachain.py

View check run for this annotation

Codecov / codecov/patch

src/datachain/lib/dc/datachain.py#L194

Added line #L194 was not covered by tests
self._delta = True
self._delta_on = on
self._delta_result_on = right_on
self._delta_compare = compare
return self

@property
def empty(self) -> bool:
"""Returns True if chain has zero number of rows"""
return not bool(self.count())

@property
def delta(self) -> bool:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it public? do we want it to be public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason why it shouldn't be public. User could check if some chain is in "delta" mode or not. It is also used in some other internal methods for which it doesn't need to be public.
I can make it private as well, don't have strong opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, that's fine .. but if we keep it public we need proper docs for it then ... and an example if you have something in mind

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minor things that is left here @ilongin ... let's please take care of it

"""Returns True if this chain is ran in "delta" update mode"""
return self._delta

@property
def schema(self) -> dict[str, DataType]:
"""Get schema of the chain."""
Expand Down Expand Up @@ -254,9 +285,17 @@
signal_schema = copy.deepcopy(self.signals_schema)
if _sys is None:
_sys = self._sys
return type(self)(
chain = type(self)(
query, settings, signal_schema=signal_schema, setup=self._setup, _sys=_sys
)
if self.delta:
chain = chain._as_delta(
on=self._delta_on,
right_on=self._delta_result_on,
compare=self._delta_compare,
)

return chain

def settings(
self,
Expand Down Expand Up @@ -463,7 +502,7 @@
attrs: Optional[list[str]] = None,
update_version: Optional[str] = "patch",
**kwargs,
) -> "Self":
) -> "DataChain":
"""Save to a Dataset. It returns the chain itself.

Parameters:
Expand All @@ -490,6 +529,35 @@
)

schema = self.signals_schema.clone_without_sys_signals().serialize()
if self.delta and name:
delta_ds, dependencies, has_changes = delta_update(
self,
name,
on=self._delta_on,
right_on=self._delta_result_on,
compare=self._delta_compare,
)

if delta_ds:
return self._evolve(
query=delta_ds._query.save(
name=name,
version=version,
feature_schema=schema,
dependencies=dependencies,
**kwargs,
)
)

if not has_changes:
# sources have not been changed so new version of resulting dataset
# would be the same as previous one. To avoid duplicating exact
# datasets, we won't create new version of it and we will return
# current latest version instead.
from .datasets import read_dataset

return read_dataset(name, **kwargs)

return self._evolve(
query=self._query.save(
name=name,
Expand Down Expand Up @@ -615,6 +683,7 @@
signal_schema=udf_obj.output,
)

@delta_disabled
def agg(
self,
func: Optional[Callable] = None,
Expand Down Expand Up @@ -768,6 +837,7 @@

return self._evolve(query=self._query.order_by(*args))

@delta_disabled
def distinct(self, arg: str, *args: str) -> "Self": # type: ignore[override]
"""Removes duplicate rows based on uniqueness of some input column(s)
i.e if rows are found with the same value of input column(s), only one
Expand Down Expand Up @@ -802,6 +872,7 @@
query=self._query.select(*columns), signal_schema=new_schema
)

@delta_disabled # type: ignore[arg-type]
def group_by(
self,
*,
Expand Down Expand Up @@ -1160,6 +1231,7 @@
schema = self.signals_schema.clone_without_file_signals()
return self.select(*schema.values.keys())

@delta_disabled
def merge(
self,
right_ds: "DataChain",
Expand Down Expand Up @@ -1268,6 +1340,7 @@

return ds

@delta_disabled
def union(self, other: "Self") -> "Self":
"""Return the set union of the two datasets.

Expand Down
Loading