Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Edge V2 migration opt-in, capacity budget for migration #3881

Merged
merged 14 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1208,3 +1208,8 @@
"SAML": True, # Security Assertion Markup Language
},
)

EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET = env.int(
"EDGE_V2_MIGRATION_READ_CAPACITY_BUDGET",
default=0,
)
6 changes: 4 additions & 2 deletions api/environments/dynamodb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from .types import DynamoProjectMetadata
from .wrappers import (
from environments.dynamodb.types import DynamoProjectMetadata
from environments.dynamodb.wrappers import (
DynamoEnvironmentAPIKeyWrapper,
DynamoEnvironmentV2Wrapper,
DynamoEnvironmentWrapper,
DynamoIdentityWrapper,
)
from environments.dynamodb.wrappers.exceptions import CapacityBudgetExceeded

__all__ = (
"CapacityBudgetExceeded",
"DynamoEnvironmentAPIKeyWrapper",
"DynamoEnvironmentV2Wrapper",
"DynamoEnvironmentWrapper",
Expand Down
69 changes: 53 additions & 16 deletions api/environments/dynamodb/services.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
import logging
from decimal import Decimal
from typing import Generator, Iterable

from flag_engine.identities.models import IdentityModel

from environments.dynamodb import (
CapacityBudgetExceeded,
DynamoEnvironmentV2Wrapper,
DynamoIdentityWrapper,
)
from environments.dynamodb.types import (
EdgeV2MigrationResult,
IdentityOverridesV2Changeset,
IdentityOverrideV2,
)
from environments.models import Environment
from projects.models import EdgeV2MigrationStatus
from util.mappers import map_engine_feature_state_to_identity_override

logger = logging.getLogger(__name__)


def migrate_environments_to_v2(project_id: int) -> IdentityOverridesV2Changeset | None:
def migrate_environments_to_v2(
project_id: int,
capacity_budget: Decimal,
) -> EdgeV2MigrationResult | None:
"""
Migrate project's environments to `environments_v2` table.

:param project_id: ID of project to migrate.
:param capacity_budget: Max read capacity to spend when reading identities. Can be 0.

:returns: `EdgeV2MigrationResult` object or `None`.

If provided `capacity_budget` exceeded, including a budget of 0,
`EdgeV2MigrationResult.status` is set to `INCOMPLETE` and no identity overrides get
written to `environments_v2` table.
"""
dynamo_wrapper_v2 = DynamoEnvironmentV2Wrapper()
identity_wrapper = DynamoIdentityWrapper()

Expand All @@ -26,39 +45,57 @@ def migrate_environments_to_v2(project_id: int) -> IdentityOverridesV2Changeset

logger.info("Migrating environments to v2 for project %d", project_id)

environments_to_migrate = Environment.objects.filter(project_id=project_id)
identity_overrides_to_migrate = _iter_paginated_overrides(
identity_wrapper=identity_wrapper,
environments=environments_to_migrate,
environments_to_migrate = Environment.objects.filter_for_document_builder(
project_id=project_id
)

changeset = IdentityOverridesV2Changeset(
to_put=list(identity_overrides_to_migrate), to_delete=[]
)
logger.info(
"Retrieved %d identity overrides to migrate for project %d",
len(changeset.to_put),
project_id,
)
identity_overrides_changeset = IdentityOverridesV2Changeset(to_put=[], to_delete=[])
result_status = EdgeV2MigrationStatus.COMPLETE

try:
to_put = list(
_iter_paginated_overrides(
identity_wrapper=identity_wrapper,
environments=environments_to_migrate,
capacity_budget=capacity_budget,
)
)
except CapacityBudgetExceeded as exc:
result_status = EdgeV2MigrationStatus.INCOMPLETE
logger.warning("Incomplete migration for project %d", project_id, exc_info=exc)
else:
identity_overrides_changeset.to_put = to_put
logger.info(
"Retrieved %d identity overrides to migrate for project %d",
len(to_put),
project_id,
)

dynamo_wrapper_v2.write_environments(environments_to_migrate)
dynamo_wrapper_v2.update_identity_overrides(changeset)
dynamo_wrapper_v2.update_identity_overrides(identity_overrides_changeset)

logger.info("Finished migrating environments to v2 for project %d", project_id)
return changeset
return EdgeV2MigrationResult(
identity_overrides_changeset=identity_overrides_changeset,
status=result_status,
)


def _iter_paginated_overrides(
*,
identity_wrapper: DynamoIdentityWrapper,
environments: Iterable[Environment],
capacity_budget: Decimal,
) -> Generator[IdentityOverrideV2, None, None]:
for environment in environments:
environment_api_key = environment.api_key
for item in identity_wrapper.iter_all_items_paginated(
environment_api_key=environment_api_key,
capacity_budget=capacity_budget,
projection_expression="environment_api_key, identifier, identity_features, identity_uuid",
overrides_only=True,
):
identity = IdentityModel.parse_obj(item)
identity = IdentityModel.model_validate(item)
for feature_state in identity.identity_features:
yield map_engine_feature_state_to_identity_override(
feature_state=feature_state,
Expand Down
10 changes: 10 additions & 0 deletions api/environments/dynamodb/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import enum
import typing
from dataclasses import asdict, dataclass
from datetime import datetime

Expand All @@ -7,6 +8,9 @@
from flag_engine.features.models import FeatureStateModel
from pydantic import BaseModel

if typing.TYPE_CHECKING:
from projects.models import EdgeV2MigrationStatus

project_metadata_table = None

if settings.PROJECT_METADATA_TABLE_NAME_DYNAMO:
Expand Down Expand Up @@ -90,3 +94,9 @@ class IdentityOverrideV2(BaseModel):
class IdentityOverridesV2Changeset:
to_delete: list[IdentityOverrideV2]
to_put: list[IdentityOverrideV2]


@dataclass
class EdgeV2MigrationResult:
identity_overrides_changeset: IdentityOverridesV2Changeset
status: "EdgeV2MigrationStatus"
11 changes: 11 additions & 0 deletions api/environments/dynamodb/wrappers/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from decimal import Decimal


class CapacityBudgetExceeded(Exception):
def __init__(
self,
capacity_budget: Decimal,
capacity_spent: Decimal,
) -> None:
self.capacity_budget = capacity_budget
self.capacity_spent = capacity_spent
38 changes: 30 additions & 8 deletions api/environments/dynamodb/wrappers/identity_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import typing
from contextlib import suppress
from decimal import Decimal
from typing import Iterable

from boto3.dynamodb.conditions import Key
from boto3.dynamodb.conditions import Attr, Key
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from flag_engine.environments.models import EnvironmentModel
Expand All @@ -12,12 +13,14 @@
from rest_framework.exceptions import NotFound

from environments.dynamodb.constants import IDENTITIES_PAGINATION_LIMIT
from environments.dynamodb.wrappers.exceptions import CapacityBudgetExceeded
from util.mappers import map_identity_to_identity_document

from .base import BaseDynamoWrapper
from .environment_wrapper import DynamoEnvironmentWrapper

if typing.TYPE_CHECKING:
from boto3.dynamodb.conditions import ConditionBase
from mypy_boto3_dynamodb.type_defs import (
QueryInputRequestTypeDef,
QueryOutputTableTypeDef,
Expand Down Expand Up @@ -89,37 +92,56 @@ def get_all_items(
environment_api_key: str,
limit: int,
start_key: dict[str, "TableAttributeValueTypeDef"] | None = None,
projection_expression: str = None,
filter_expression: "ConditionBase | str | None" = None,
projection_expression: str | None = None,
return_consumed_capacity: bool = False,
) -> "QueryOutputTableTypeDef":
filter_expression = Key("environment_api_key").eq(environment_api_key)
key_condition_expression = Key("environment_api_key").eq(environment_api_key)
query_kwargs: "QueryInputRequestTypeDef" = {
"IndexName": "environment_api_key-identifier-index",
"KeyConditionExpression": filter_expression,
"KeyConditionExpression": key_condition_expression,
"Limit": limit,
}
if projection_expression:
query_kwargs["ProjectionExpression"] = projection_expression

if start_key:
query_kwargs["ExclusiveStartKey"] = start_key
if filter_expression:
query_kwargs["FilterExpression"] = filter_expression
if projection_expression:
query_kwargs["ProjectionExpression"] = projection_expression
if return_consumed_capacity:
# Use `TOTAL` because we don't need per-index/per-table consumed capacity
query_kwargs["ReturnConsumedCapacity"] = "TOTAL"
return self.query_items(**query_kwargs)

def iter_all_items_paginated(
self,
environment_api_key: str,
limit: int = IDENTITIES_PAGINATION_LIMIT,
projection_expression: str = None,
projection_expression: str | None = None,
capacity_budget: Decimal = Decimal("Inf"),
overrides_only: bool = False,
) -> typing.Generator[dict, None, None]:
last_evaluated_key = "initial"
get_all_items_kwargs = {
"environment_api_key": environment_api_key,
"limit": limit,
"projection_expression": projection_expression,
"return_consumed_capacity": capacity_budget != Decimal("Inf"),
}
if overrides_only:
get_all_items_kwargs["filter_expression"] = Attr("identity_features").ne([])
capacity_spent = 0
while last_evaluated_key:
if capacity_spent >= capacity_budget:
raise CapacityBudgetExceeded(
capacity_budget=capacity_budget,
capacity_spent=capacity_spent,
)
query_response = self.get_all_items(
**get_all_items_kwargs,
)
with suppress(KeyError):
capacity_spent += query_response["ConsumedCapacity"]["CapacityUnits"]
for item in query_response["Items"]:
yield item
if last_evaluated_key := query_response.get("LastEvaluatedKey"):
Expand Down
8 changes: 2 additions & 6 deletions api/environments/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from features.models import Feature, FeatureSegment, FeatureState
from features.multivariate.models import MultivariateFeatureStateValue
from metadata.models import Metadata
from projects.models import IdentityOverridesV2MigrationStatus, Project
from projects.models import Project
from segments.models import Segment
from util.mappers import map_environment_to_sdk_document
from webhooks.models import AbstractBaseExportableWebhookModel
Expand Down Expand Up @@ -267,11 +267,7 @@ def write_environments_to_dynamodb(

environment_wrapper.write_environments(environments)

if (
project.identity_overrides_v2_migration_status
== IdentityOverridesV2MigrationStatus.COMPLETE
and environment_v2_wrapper.is_enabled
):
if project.edge_v2_environments_migrated and environment_v2_wrapper.is_enabled:
environment_v2_wrapper.write_environments(environments)

def get_feature_state(
Expand Down
25 changes: 12 additions & 13 deletions api/features/features_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
)
from features.dataclasses import EnvironmentFeatureOverridesData
from features.versioning.versioning_service import get_environment_flags_list
from projects.models import IdentityOverridesV2MigrationStatus

if typing.TYPE_CHECKING:
from environments.models import Environment
Expand All @@ -25,21 +24,21 @@ def get_overrides_data(
:return: overrides data getter
"""
project = environment.project
match project.enable_dynamo_db, project.identity_overrides_v2_migration_status:
case True, IdentityOverridesV2MigrationStatus.COMPLETE:

if project.enable_dynamo_db:
if project.edge_v2_identity_overrides_migrated:
# If v2 migration is complete, count segment overrides from Core
# and identity overrides from DynamoDB.
return get_edge_overrides_data(environment)
case True, _:
# If v2 migration is in progress or not started, we want to count Core overrides,
# but only the segment ones, as the identity ones in DynamoDB are uncountable for v1.
return get_core_overrides_data(
environment,
skip_identity_overrides=True,
)
case _, _:
# For projects still fully on Core, count all overrides from Core.
return get_core_overrides_data(environment)
# If v2 migration is not started, in progress, or incomplete,
# only count segment overrides from Core.
# v1 Edge identity overrides are uncountable.
return get_core_overrides_data(
environment,
skip_identity_overrides=True,
)
# For projects still fully on Core, count all overrides from Core.
return get_core_overrides_data(environment)


def get_core_overrides_data(
Expand Down
2 changes: 1 addition & 1 deletion api/projects/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class ProjectAdmin(admin.ModelAdmin):
"max_segments_allowed",
"max_features_allowed",
"max_segment_overrides_allowed",
"identity_overrides_v2_migration_status",
"edge_v2_migration_status",
)

@admin.action(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from django.db import migrations, models
from django.db.backends.base.schema import BaseDatabaseSchemaEditor

from projects.models import IdentityOverridesV2MigrationStatus
from projects.models import EdgeV2MigrationStatus


def apply_defaults(apps: Apps, schema_editor: BaseDatabaseSchemaEditor) -> None:
apps.get_model("projects", "Project").objects.all().update(
identity_overrides_v2_migration_status=IdentityOverridesV2MigrationStatus.NOT_STARTED
identity_overrides_v2_migration_status=EdgeV2MigrationStatus.NOT_STARTED
)


Expand Down
Loading