-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incremental (delta) update #928
Open
ilongin
wants to merge
23
commits into
main
Choose a base branch
from
ilongin/798-incremental-update
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
fa09d0b
adding incremental update
ilongin 99a5327
continued working on incremental
f01b3a2
finixhed first test
ilongin 8fa1534
added from storage incremental update test
67824e6
refactoring
ilongin c11d797
merging with main
ilongin ee6640d
using delta instead of incremental
ilongin 5e446b5
added check for modification
ilongin 71c3469
added another test
ilongin 83366aa
refactoring
ilongin a22916c
added comment
ilongin d9e4f26
split tests in new file
ilongin c622ba4
Merge branch 'main' into ilongin/798-incremental-update
ilongin 58c27f0
updated docs
ilongin ad5ee5a
Merge branch 'main' into ilongin/798-incremental-update
ilongin 079ba7a
Merge branch 'main' into ilongin/798-incremental-update
ilongin de026e3
Merge branch 'main' into ilongin/798-incremental-update
ilongin d1d066e
merged with main
ilongin 046731b
added sys columns explicitly
ilongin 9f52c8b
fixing delta to not have old versions in end result
ilongin 802a934
added append steps
ilongin f3a7b12
fixing logic
ilongin 3e7fa37
Merge branch 'main' into ilongin/798-incremental-update
ilongin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from typing import TYPE_CHECKING, Optional | ||
|
||
from datachain.error import DatasetNotFoundError | ||
|
||
if TYPE_CHECKING: | ||
from datachain.lib.dc import DataChain | ||
|
||
|
||
def delta_update(dc: "DataChain", name: str) -> Optional["DataChain"]: | ||
""" | ||
Creates new chain that consists of the last version of current delta dataset | ||
plus diff from the source with all needed modifications. | ||
This way we don't need to re-calculate the whole chain from the source again( | ||
apply all the DataChain methods like filters, mappers, generators etc.) | ||
but just the diff part which is very important for performance. | ||
""" | ||
from datachain.lib.dc import DataChain | ||
|
||
file_signal = dc.signals_schema.get_file_signal() | ||
if not file_signal: | ||
raise ValueError("Datasets without file signals cannot have delta updates") | ||
try: | ||
latest_version = dc.session.catalog.get_dataset(name).latest_version | ||
except DatasetNotFoundError: | ||
# first creation of delta update dataset | ||
return None | ||
|
||
source_ds_name = dc._query.starting_step.dataset_name | ||
source_ds_version = dc._query.starting_step.dataset_version | ||
diff = DataChain.from_dataset(source_ds_name, version=source_ds_version).diff( | ||
DataChain.from_dataset(name, version=latest_version), on=file_signal | ||
) | ||
# we append all the steps from the original chain to diff, | ||
# e.g filters, mappers, generators etc. With this we make sure we add all | ||
# needed modifications to diff part as well | ||
diff._query.steps += dc._query.steps | ||
|
||
# merging diff and the latest version of our dataset | ||
return diff.union(DataChain.from_dataset(name, latest_version)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
import os | ||
|
||
import pytest | ||
import regex as re | ||
from PIL import Image | ||
|
||
from datachain import func | ||
from datachain.lib.dc import C, DataChain | ||
from datachain.lib.file import File, ImageFile | ||
|
||
|
||
def test_delta_update_from_dataset(test_session, tmp_dir, tmp_path): | ||
starting_ds_name = "starting_ds" | ||
ds_name = "delta_ds" | ||
|
||
images = [ | ||
{"name": "img1.jpg", "data": Image.new(mode="RGB", size=(64, 64))}, | ||
{"name": "img2.jpg", "data": Image.new(mode="RGB", size=(128, 128))}, | ||
{"name": "img3.jpg", "data": Image.new(mode="RGB", size=(64, 64))}, | ||
{"name": "img4.jpg", "data": Image.new(mode="RGB", size=(128, 128))}, | ||
] | ||
|
||
def create_image_dataset(ds_name, images): | ||
DataChain.from_values( | ||
file=[ | ||
ImageFile(path=img["name"], source=f"file://{tmp_path}") | ||
for img in images | ||
], | ||
session=test_session, | ||
).save(ds_name) | ||
|
||
def create_delta_dataset(ds_name): | ||
DataChain.from_dataset( | ||
starting_ds_name, | ||
session=test_session, | ||
).save(ds_name, delta=True) | ||
|
||
# first version of starting dataset | ||
create_image_dataset(starting_ds_name, images[:2]) | ||
# first version of delta dataset | ||
create_delta_dataset(ds_name) | ||
# second version of starting dataset | ||
create_image_dataset(starting_ds_name, images[2:]) | ||
# second version of delta dataset | ||
create_delta_dataset(ds_name) | ||
|
||
assert list( | ||
DataChain.from_dataset(ds_name, version=1) | ||
.order_by("file.path") | ||
.collect("file.path") | ||
) == [ | ||
"img1.jpg", | ||
"img2.jpg", | ||
] | ||
|
||
assert list( | ||
DataChain.from_dataset(ds_name, version=2) | ||
.order_by("file.path") | ||
.collect("file.path") | ||
) == [ | ||
"img1.jpg", | ||
"img2.jpg", | ||
"img3.jpg", | ||
"img4.jpg", | ||
] | ||
|
||
|
||
def test_delta_update_from_storage(test_session, tmp_dir, tmp_path): | ||
ds_name = "delta_ds" | ||
path = tmp_dir.as_uri() | ||
tmp_dir = tmp_dir / "images" | ||
os.mkdir(tmp_dir) | ||
|
||
images = [ | ||
{ | ||
"name": f"img{i}.{'jpg' if i % 2 == 0 else 'png'}", | ||
"data": Image.new(mode="RGB", size=((i + 1) * 10, (i + 1) * 10)), | ||
} | ||
for i in range(20) | ||
] | ||
|
||
# save only half of the images for now | ||
for img in images[:10]: | ||
img["data"].save(tmp_dir / img["name"]) | ||
|
||
def create_delta_dataset(): | ||
def my_embedding(file: File) -> list[float]: | ||
return [0.5, 0.5] | ||
|
||
def get_index(file: File) -> int: | ||
r = r".+\/img(\d+)\.jpg" | ||
return int(re.search(r, file.path).group(1)) # type: ignore[union-attr] | ||
|
||
( | ||
DataChain.from_storage(path, update=True, session=test_session) | ||
.filter(C("file.path").glob("*.jpg")) | ||
.map(emb=my_embedding) | ||
.mutate(dist=func.cosine_distance("emb", (0.1, 0.2))) | ||
.map(index=get_index) | ||
.filter(C("index") > 3) | ||
.save(ds_name, delta=True) | ||
) | ||
|
||
# first version of delta dataset | ||
create_delta_dataset() | ||
|
||
# remember old etags for later comparison to prove modified images are also taken | ||
# into consideration on delta update | ||
etags = { | ||
r[0]: r[1].etag | ||
for r in DataChain.from_dataset(ds_name, version=1).collect("index", "file") | ||
} | ||
|
||
# remove last couple of images to simulate modification since we will re-create it | ||
for img in images[5:10]: | ||
os.remove(tmp_dir / img["name"]) | ||
|
||
# save other half of images and the ones that are removed above | ||
for img in images[5:]: | ||
img["data"].save(tmp_dir / img["name"]) | ||
|
||
# second version of delta dataset | ||
create_delta_dataset() | ||
|
||
assert list( | ||
DataChain.from_dataset(ds_name, version=1) | ||
.order_by("file.path") | ||
.collect("file.path") | ||
) == [ | ||
"images/img4.jpg", | ||
"images/img6.jpg", | ||
"images/img8.jpg", | ||
] | ||
|
||
assert list( | ||
DataChain.from_dataset(ds_name, version=2) | ||
.order_by("file.path") | ||
.collect("file.path") | ||
) == [ | ||
"images/img10.jpg", | ||
"images/img12.jpg", | ||
"images/img14.jpg", | ||
"images/img16.jpg", | ||
"images/img18.jpg", | ||
"images/img4.jpg", | ||
"images/img6.jpg", | ||
"images/img6.jpg", | ||
"images/img8.jpg", | ||
"images/img8.jpg", | ||
] | ||
|
||
# check that we have both old and new version of those that are modified | ||
rows = list( | ||
DataChain.from_dataset(ds_name, version=2) | ||
.filter(C("index") == 6) | ||
.order_by("file.path", "file.etag") | ||
.collect("file") | ||
) | ||
assert rows[0].etag == etags[6] | ||
assert rows[1].etag > etags[6] # new etag is bigger as it's the value of mtime | ||
|
||
|
||
def test_delta_update_no_diff(test_session, tmp_dir, tmp_path): | ||
ds_name = "delta_ds" | ||
path = tmp_dir.as_uri() | ||
tmp_dir = tmp_dir / "images" | ||
os.mkdir(tmp_dir) | ||
|
||
images = [ | ||
{"name": f"img{i}.jpg", "data": Image.new(mode="RGB", size=(64, 128))} | ||
for i in range(10) | ||
] | ||
|
||
for img in images: | ||
img["data"].save(tmp_dir / img["name"]) | ||
|
||
def create_delta_dataset(): | ||
def get_index(file: File) -> int: | ||
r = r".+\/img(\d+)\.jpg" | ||
return int(re.search(r, file.path).group(1)) # type: ignore[union-attr] | ||
|
||
( | ||
DataChain.from_storage(path, update=True, session=test_session) | ||
.filter(C("file.path").glob("*.jpg")) | ||
.map(index=get_index) | ||
.filter(C("index") > 5) | ||
.save(ds_name, delta=True) | ||
) | ||
|
||
create_delta_dataset() | ||
create_delta_dataset() | ||
|
||
assert ( | ||
list( | ||
DataChain.from_dataset(ds_name, version=1) | ||
.order_by("file.path") | ||
.collect("file.path") | ||
) | ||
== list( | ||
DataChain.from_dataset(ds_name, version=2) | ||
.order_by("file.path") | ||
.collect("file.path") | ||
) | ||
== [ | ||
"images/img6.jpg", | ||
"images/img7.jpg", | ||
"images/img8.jpg", | ||
"images/img9.jpg", | ||
] | ||
) | ||
|
||
|
||
def test_delta_update_no_file_signals(test_session): | ||
starting_ds_name = "starting_ds" | ||
|
||
DataChain.from_values(num=[10, 20], session=test_session).save(starting_ds_name) | ||
|
||
with pytest.raises(ValueError) as excinfo: | ||
DataChain.from_dataset( | ||
starting_ds_name, | ||
session=test_session, | ||
).save("delta_ds", delta=True) | ||
|
||
assert ( | ||
str(excinfo.value) == "Datasets without file signals cannot have delta updates" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also test for nested file signal or is it out of the scope of this method?
E.g.:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now it only works for top level file objects. In future we can add nested as well if needed