Skip to content

Commit

Permalink
chore: Use chunks when batch writing identity overrides (#3143)
Browse files Browse the repository at this point in the history
  • Loading branch information
khvn26 authored Dec 12, 2023
1 parent 746b1fa commit c94eeeb
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 14 deletions.
1 change: 1 addition & 0 deletions api/environments/dynamodb/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
ENVIRONMENTS_V2_SECONDARY_INDEX = "environment_api_key-index"
ENVIRONMENTS_V2_SECONDARY_INDEX_PARTITION_KEY = "environment_api_key"

DYNAMODB_MAX_BATCH_WRITE_ITEM_COUNT = 25
IDENTITIES_PAGINATION_LIMIT = 1000
35 changes: 21 additions & 14 deletions api/environments/dynamodb/dynamodb_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)

from environments.dynamodb.constants import (
DYNAMODB_MAX_BATCH_WRITE_ITEM_COUNT,
ENVIRONMENTS_V2_PARTITION_KEY,
ENVIRONMENTS_V2_SORT_KEY,
IDENTITIES_PAGINATION_LIMIT,
Expand All @@ -38,6 +39,7 @@
map_identity_override_to_identity_override_document,
map_identity_to_identity_document,
)
from util.util import iter_paired_chunks

if typing.TYPE_CHECKING:
from environments.identities.models import Identity
Expand Down Expand Up @@ -239,20 +241,25 @@ def update_identity_overrides(
self,
changeset: IdentityOverridesV2Changeset,
) -> None:
with self._table.batch_writer() as writer:
for identity_override_to_delete in changeset.to_delete:
writer.delete_item(
Key={
ENVIRONMENTS_V2_PARTITION_KEY: identity_override_to_delete.environment_id,
ENVIRONMENTS_V2_SORT_KEY: identity_override_to_delete.document_key,
},
)
for identity_override_to_put in changeset.to_put:
writer.put_item(
Item=map_identity_override_to_identity_override_document(
identity_override_to_put
),
)
for to_put, to_delete in iter_paired_chunks(
changeset.to_put,
changeset.to_delete,
chunk_size=DYNAMODB_MAX_BATCH_WRITE_ITEM_COUNT,
):
with self._table.batch_writer() as writer:
for identity_override_to_delete in to_delete:
writer.delete_item(
Key={
ENVIRONMENTS_V2_PARTITION_KEY: identity_override_to_delete.environment_id,
ENVIRONMENTS_V2_SORT_KEY: identity_override_to_delete.document_key,
},
)
for identity_override_to_put in to_put:
writer.put_item(
Item=map_identity_override_to_identity_override_document(
identity_override_to_put
),
)

def write_environments(self, environments: Iterable["Environment"]) -> None:
with self._table.batch_writer() as writer:
Expand Down
44 changes: 44 additions & 0 deletions api/tests/unit/util/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from util.util import iter_paired_chunks


def test__iter_paired_chunks__empty():
assert list(iter_paired_chunks([], [], chunk_size=1)) == []


def test__iter_paired_chunks__first_empty():
assert list(iter_paired_chunks([], [1, 2, 3], chunk_size=1)) == [
([], [1]),
([], [2]),
([], [3]),
]


def test__iter_paired_chunks__second_empty():
assert list(iter_paired_chunks([1, 2, 3], [], chunk_size=1)) == [
([1], []),
([2], []),
([3], []),
]


def test__iter_paired_chunks__first_shorter():
assert list(iter_paired_chunks([1, 2, 3], [4, 5, 6, 7, 8], chunk_size=3)) == [
([1, 2], [4]),
([3], [5, 6]),
([], [7, 8]),
]


def test__iter_pair_chunks__second_shorter():
assert list(iter_paired_chunks([1, 2, 3, 4, 5], [6, 7, 8], chunk_size=3)) == [
([1, 2], [6]),
([3, 4], [7]),
([5], [8]),
]


def test__iter_pair_chunks__same_length():
assert list(iter_paired_chunks([1, 2, 3], [4, 5, 6], chunk_size=3)) == [
([1, 2], [4]),
([3], [5, 6]),
]
31 changes: 31 additions & 0 deletions api/util/util.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from itertools import islice
from math import ceil
from threading import Thread
from typing import Generator, Iterable, TypeVar

T = TypeVar("T")


def postpone(function):
Expand All @@ -8,3 +13,29 @@ def decorator(*args, **kwargs):
t.start()

return decorator


def iter_paired_chunks(
iterable_1: Iterable[T],
iterable_2: Iterable[T],
*,
chunk_size: int,
) -> Generator[tuple[Iterable[T], Iterable[T]], None, None]:
"""
Iterate over two iterables in parallel, yielding chunks of combined size `chunk_size`.
"""
iterator_1 = iter(iterable_1)
iterator_2 = iter(iterable_2)
chunk_1_size = ceil(chunk_size / 2)
while True:
chunk_1 = list(islice(iterator_1, chunk_1_size))
chunk_2_size = chunk_size - len(chunk_1)
chunk_2 = list(islice(iterator_2, chunk_2_size))
if not chunk_2:
if not chunk_1:
break
# iterable_2 is exhausted, but iterable_1 is not
# let's get greedy
chunk_1 += list(islice(iterator_1, chunk_2_size))
chunk_1_size = chunk_size
yield chunk_1, chunk_2

0 comments on commit c94eeeb

Please sign in to comment.