Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental (delta) update #928

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open

Conversation

ilongin
Copy link
Contributor

@ilongin ilongin commented Feb 19, 2025

Adding the ability to do incremental, or delta updates with dataset. Idea is to not re-build the whole dataset from source once source has some changes (new or modified files in s3 bucket for example), but to create diff chain between the last version of dataset and source with all modifications added (chain methods like mappers, filters etc. what has been added to create original chain from source), and then merge diff with last version of dataset. This way we will have much better performance.

The way user will run delta updates is to just re-run the whole script where it creates dataset, with one small modification - adding delta=True on DataChain.save() method.

Example:

from datachain import DataChain, Column, C

def my_embedding(file: File) -> list[float]:
    return [0.5, 0.5]

(
      DataChain.from_storage(
          "s3://ldb-public/remote/data-lakes/dogs-and-cats/", type="image", anon=True
      )
      .filter(C.name.glob("*cat*"))
      .filter(C.name.glob("*.jpg"))
      .map(emb=my_embedding)
      .mutate(dist=func.cosine_distance("emb", (0.1, 0.2)))
      .save("cats", delta=True)
)

@ilongin ilongin marked this pull request as draft February 19, 2025 15:13
@ilongin ilongin linked an issue Feb 19, 2025 that may be closed by this pull request
Copy link

cloudflare-workers-and-pages bot commented Feb 19, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 3e7fa37
Status: ✅  Deploy successful!
Preview URL: https://217ce101.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-798-incremental-upda.datachain-documentation.pages.dev

View logs

@dmpetrov
Copy link
Member

@ilongin it would be great to extract all logic outside of the fat file dc.py to increment.py or dc_incremental.py

Also, should we call it incremental or delta? :) Delta seems better but I don't like it do to a conflict with Delta Lake. Any ideas? :)

Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 8fa1534
Status: ✅  Deploy successful!
Preview URL: https://c897b9bc.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-798-incremental-upda.datachain-documentation.pages.dev

View logs

@ilongin
Copy link
Contributor Author

ilongin commented Feb 21, 2025

@ilongin it would be great to extract all logic outside of the fat file dc.py to increment.py or dc_incremental.py

Also, should we call it incremental or delta? :) Delta seems better but I don't like it do to a conflict with Delta Lake. Any ideas? :)

@dmpetrov one question just to be 100% sure. How do we deal with different statuses : added, modified, removed, same?

My assumption is to:

  1. Added records are appended to previous dataset (current last version of it)
  2. Modified records are replacing those matched from previous dataset in new dataset
  3. Deleted records > Do nothing about it, but maybe we should remove them in new dataset??
  4. Same -> nothing to do here

Currently DataChain.diff() returns only added and changed records by default...for other statuses explicit flags must be set.

Regarding the name, delta makes more sense if we are not just appending new ones, otherwise it's more like incremental, but I don't have strong opinion here...both sound reasonable to me.

Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 67824e6
Status:⚡️  Build in progress...

View logs

Copy link

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 67824e6
Status: ✅  Deploy successful!
Preview URL: https://b84a6f31.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-798-incremental-upda.datachain-documentation.pages.dev

View logs

@dmpetrov
Copy link
Member

Currently DataChain.diff() returns only added and changed records by default...

Let's use the same default for the incremental update.

delta makes more sense

Then let's use Delta 🙂

Copy link

codecov bot commented Feb 24, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 87.74%. Comparing base (ca27757) to head (3e7fa37).

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #928      +/-   ##
==========================================
+ Coverage   87.65%   87.74%   +0.08%     
==========================================
  Files         131      132       +1     
  Lines       11896    11923      +27     
  Branches     1622     1625       +3     
==========================================
+ Hits        10428    10462      +34     
+ Misses       1058     1053       -5     
+ Partials      410      408       -2     
Flag Coverage Δ
datachain 87.67% <100.00%> (+0.08%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@ilongin ilongin marked this pull request as ready for review February 25, 2025 15:34
@ilongin ilongin changed the title Incremental update Incremental (delta) update Feb 27, 2025
@amritghimire amritghimire requested a review from Copilot February 28, 2025 14:14

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Overview

This PR introduces incremental (delta) update functionality to optimize dataset updates by computing and merging diffs rather than re‐processing the entire source, and it updates related tests and supporting modules accordingly.

  • Implements a new delta_update function in the core module.
  • Adds functional and unit tests covering delta updates from both datasets and storage.
  • Updates the save and query methods (and related documentation) to support delta processing and refactors file signal retrieval.

Reviewed Changes

File Description
src/datachain/delta.py Adds delta_update for incremental diff computation.
tests/func/test_delta.py Introduces tests to validate delta updates from various sources.
tests/unit/lib/test_signal_schema.py Adds unit tests for file signal retrieval.
src/datachain/lib/dc.py Updates the save method to allow a delta flag.
src/datachain/query/dataset.py Refactors StartingStep to QueryStep; updates type annotation.
src/datachain/lib/signal_schema.py Replaces contains_file() with get_file_signal() for clarity.

Copilot reviewed 6 out of 6 changed files in this pull request and generated no comments.

Comments suppressed due to low confidence (2)

src/datachain/delta.py:36

  • Consider reviewing the logic of appending the original chain's steps onto the diff chain to ensure that the order of applied transformations is correct and consistent with user expectations.
diff._query.steps += dc._query.steps

tests/func/test_delta.py:146

  • Duplicate file entries (e.g., 'images/img6.jpg' and 'images/img8.jpg' appear twice) in the expected output may indicate an unintended behavior in the union operation. Please confirm whether duplicates are intentional or if filtering/aggregation is needed.
"images/img6.jpg",
Copy link
Contributor

@dreadatour dreadatour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 👍🔥



def test_get_file_signal():
assert SignalSchema({"name": str, "f": File}).get_file_signal() == "f"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test for nested file signal or is it out of the scope of this method?

E.g.:

class CustomModel(DataModel):
    file: File
    foo: str
    bar: float

assert SignalSchema({"name": str, "custom": CustomModel}).get_file_signal() == "custom.file"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now it only works for top level file objects. In future we can add nested as well if needed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incremental update
3 participants