From e8b203be0ab9e2547002fec7c651d32aa45a94cb Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Tue, 11 Mar 2025 15:05:48 +0000 Subject: [PATCH] fix: Some tasks are not being initialised (#5213) --- api/app/settings/common.py | 5 ++- api/app_analytics/cache.py | 7 +++- api/app_analytics/tasks.py | 20 ++++++++-- api/app_analytics/track.py | 5 --- api/app_analytics/views.py | 6 ++- api/audit/signals.py | 3 +- api/custom_auth/apps.py | 1 - api/edge_api/identities/apps.py | 4 +- .../identities/edge_request_forwarder.py | 37 ++++++------------- api/edge_api/identities/tasks.py | 25 +++++++++++++ api/environments/identities/traits/views.py | 2 +- api/environments/identities/views.py | 2 +- api/features/apps.py | 1 - api/features/feature_health/apps.py | 7 ---- api/features/tasks.py | 4 +- api/organisations/chargebee/apps.py | 5 +++ api/poetry.lock | 6 +-- api/pyproject.toml | 2 +- api/segments/apps.py | 5 --- .../identities/test_edge_request_forwarder.py | 30 +++------------ api/webhooks/apps.py | 5 +++ api/webhooks/tasks.py | 25 +++++++++++++ api/webhooks/webhooks.py | 10 ++--- 23 files changed, 121 insertions(+), 96 deletions(-) create mode 100644 api/organisations/chargebee/apps.py create mode 100644 api/webhooks/apps.py create mode 100644 api/webhooks/tasks.py diff --git a/api/app/settings/common.py b/api/app/settings/common.py index 8c058b1b5a4f..42f7817f6338 100644 --- a/api/app/settings/common.py +++ b/api/app/settings/common.py @@ -94,16 +94,19 @@ "corsheaders", "users", "organisations", + "organisations.chargebee", "organisations.invites", "organisations.permissions", "projects", "sales_dashboard", "edge_api", + "edge_api.identities", "environments", "environments.permissions", "environments.identities", "environments.identities.traits", "features", + "features.feature_external_resources", "features.feature_health", "features.import_export", "features.multivariate", @@ -118,7 +121,7 @@ "permissions", "projects.tags", "api_keys", - "features.feature_external_resources", + "webhooks", # 2FA "custom_auth.mfa.trench", # health check plugins diff --git a/api/app_analytics/cache.py b/api/app_analytics/cache.py index 00cea0151f1d..f30b343c676b 100644 --- a/api/app_analytics/cache.py +++ b/api/app_analytics/cache.py @@ -4,8 +4,11 @@ from django.conf import settings from django.utils import timezone -from app_analytics.tasks import track_feature_evaluation, track_request -from app_analytics.track import track_feature_evaluation_influxdb +from app_analytics.tasks import ( + track_feature_evaluation, + track_feature_evaluation_influxdb, + track_request, +) class APIUsageCache: diff --git a/api/app_analytics/tasks.py b/api/app_analytics/tasks.py index 6980fbde06b5..3cb50e1a7c24 100644 --- a/api/app_analytics/tasks.py +++ b/api/app_analytics/tasks.py @@ -10,14 +10,19 @@ ) from app_analytics.constants import ANALYTICS_READ_BUCKET_SIZE -from environments.models import Environment - -from .models import ( +from app_analytics.models import ( APIUsageBucket, APIUsageRaw, FeatureEvaluationBucket, FeatureEvaluationRaw, ) +from app_analytics.track import ( + track_feature_evaluation_influxdb as track_feature_evaluation_influxdb_service, +) +from app_analytics.track import ( + track_feature_evaluation_influxdb_v2 as track_feature_evaluation_influxdb_v2_service, +) +from environments.models import Environment if settings.USE_POSTGRES_FOR_ANALYTICS: @@ -111,6 +116,15 @@ def track_request(resource: int, host: str, environment_key: str, count: int = 1 ) +track_feature_evaluation_influxdb = register_task_handler()( + track_feature_evaluation_influxdb_service +) + +track_feature_evaluation_influxdb_v2 = register_task_handler()( + track_feature_evaluation_influxdb_v2_service +) + + def get_start_of_current_bucket(bucket_size: int) -> datetime: if bucket_size > 60: raise ValueError("Bucket size cannot be greater than 60 minutes") diff --git a/api/app_analytics/track.py b/api/app_analytics/track.py index 87e80d2ce555..ce57ef55f070 100644 --- a/api/app_analytics/track.py +++ b/api/app_analytics/track.py @@ -7,9 +7,6 @@ from six.moves.urllib.parse import ( # type: ignore[import-untyped] quote, # python 2/3 compatible urllib import ) -from task_processor.decorators import ( # type: ignore[import-untyped] - register_task_handler, -) from app_analytics.influxdb_wrapper import InfluxDBWrapper from environments.models import Environment @@ -130,7 +127,6 @@ def track_request_influxdb(request): # type: ignore[no-untyped-def] influxdb.write() # type: ignore[no-untyped-call] -@register_task_handler() # type: ignore[misc] def track_feature_evaluation_influxdb( environment_id: int, feature_evaluations: dict[str, int] ) -> None: @@ -149,7 +145,6 @@ def track_feature_evaluation_influxdb( influxdb.write() # type: ignore[no-untyped-call] -@register_task_handler() # type: ignore[misc] def track_feature_evaluation_influxdb_v2( environment_id: int, feature_evaluations: list[dict[str, int | str | bool]] ) -> None: diff --git a/api/app_analytics/views.py b/api/app_analytics/views.py index 6f0a3fb06225..ef6cdaa55b44 100644 --- a/api/app_analytics/views.py +++ b/api/app_analytics/views.py @@ -16,8 +16,10 @@ get_usage_data, ) from app_analytics.cache import FeatureEvaluationCache -from app_analytics.tasks import track_feature_evaluation_v2 -from app_analytics.track import track_feature_evaluation_influxdb_v2 +from app_analytics.tasks import ( + track_feature_evaluation_influxdb_v2, + track_feature_evaluation_v2, +) from environments.authentication import EnvironmentKeyAuthentication from environments.permissions.permissions import EnvironmentKeyPermissions from features.models import FeatureState diff --git a/api/audit/signals.py b/api/audit/signals.py index 8a6cf1550302..9faaf7767863 100644 --- a/api/audit/signals.py +++ b/api/audit/signals.py @@ -14,7 +14,8 @@ from integrations.new_relic.new_relic import NewRelicWrapper from integrations.slack.slack import SlackWrapper from organisations.models import OrganisationWebhook -from webhooks.webhooks import WebhookEventType, call_organisation_webhooks +from webhooks.tasks import call_organisation_webhooks +from webhooks.webhooks import WebhookEventType logger = logging.getLogger(__name__) diff --git a/api/custom_auth/apps.py b/api/custom_auth/apps.py index 005927503540..13cbc25e6bf8 100644 --- a/api/custom_auth/apps.py +++ b/api/custom_auth/apps.py @@ -5,5 +5,4 @@ class CustomAuthAppConfig(AppConfig): name = "custom_auth" def ready(self) -> None: - from custom_auth import tasks # noqa F401 from custom_auth.jwt_cookie import signals # noqa F401 diff --git a/api/edge_api/identities/apps.py b/api/edge_api/identities/apps.py index f223ebb31ead..df5079120864 100644 --- a/api/edge_api/identities/apps.py +++ b/api/edge_api/identities/apps.py @@ -3,6 +3,4 @@ class EdgeAPIIdentitiesAppConfig(AppConfig): name = "edge_api.identities" - - def ready(self): # type: ignore[no-untyped-def] - from . import tasks # noqa + label = "edge_api_identities" diff --git a/api/edge_api/identities/edge_request_forwarder.py b/api/edge_api/identities/edge_request_forwarder.py index 35f347ec0cce..ebc17bc82e3a 100644 --- a/api/edge_api/identities/edge_request_forwarder.py +++ b/api/edge_api/identities/edge_request_forwarder.py @@ -1,23 +1,14 @@ import json +from typing import Any import requests from django.conf import settings -from task_processor.decorators import ( # type: ignore[import-untyped] - register_task_handler, -) -from task_processor.models import TaskPriority # type: ignore[import-untyped] from core.constants import FLAGSMITH_SIGNATURE_HEADER from core.signing import sign_payload from environments.dynamodb.migrator import IdentityMigrator -def _should_forward(project_id: int) -> bool: - migrator = IdentityMigrator(project_id) # type: ignore[no-untyped-call] - return bool(migrator.is_migration_done) - - -@register_task_handler(queue_size=2000, priority=TaskPriority.LOW) # type: ignore[misc] def forward_identity_request( # type: ignore[no-untyped-def] request_method: str, headers: dict, # type: ignore[type-arg] @@ -38,21 +29,11 @@ def forward_identity_request( # type: ignore[no-untyped-def] requests.get(url, params=query_params, headers=headers, timeout=5) -@register_task_handler(queue_size=2000, priority=TaskPriority.LOW) # type: ignore[misc] def forward_trait_request( # type: ignore[no-untyped-def] request_method: str, - headers: dict, # type: ignore[type-arg] + headers: dict[str, str], project_id: int, - payload: dict, # type: ignore[type-arg] -): - return forward_trait_request_sync(request_method, headers, project_id, payload) - - -def forward_trait_request_sync( # type: ignore[no-untyped-def] - request_method: str, - headers: dict, # type: ignore[type-arg] - project_id: int, - payload: dict, # type: ignore[type-arg] + payload: dict[str, Any], ): if not _should_forward(project_id): return @@ -67,15 +48,19 @@ def forward_trait_request_sync( # type: ignore[no-untyped-def] ) -@register_task_handler(queue_size=1000, priority=TaskPriority.LOW) # type: ignore[misc] def forward_trait_requests( # type: ignore[no-untyped-def] request_method: str, - headers: str, + headers: dict[str, str], project_id: int, - payload: dict, # type: ignore[type-arg] + payload: list[dict[str, Any]], ): for trait_data in payload: - forward_trait_request_sync(request_method, headers, project_id, trait_data) # type: ignore[arg-type] + forward_trait_request(request_method, headers, project_id, trait_data) + + +def _should_forward(project_id: int) -> bool: + migrator = IdentityMigrator(project_id) # type: ignore[no-untyped-call] + return bool(migrator.is_migration_done) def _get_headers(request_method: str, headers: dict, payload: str = "") -> dict: # type: ignore[type-arg] diff --git a/api/edge_api/identities/tasks.py b/api/edge_api/identities/tasks.py index 9415a7dd3fad..039c0c0495e2 100644 --- a/api/edge_api/identities/tasks.py +++ b/api/edge_api/identities/tasks.py @@ -9,6 +9,15 @@ from audit.models import AuditLog from audit.related_object_type import RelatedObjectType +from edge_api.identities.edge_request_forwarder import ( + forward_identity_request as forward_identity_request_service, +) +from edge_api.identities.edge_request_forwarder import ( + forward_trait_request as forward_trait_request_service, +) +from edge_api.identities.edge_request_forwarder import ( + forward_trait_requests as forward_trait_requests_service, +) from edge_api.identities.types import IdentityChangeset from environments.dynamodb import DynamoEnvironmentV2Wrapper from environments.models import Environment, Webhook @@ -172,3 +181,19 @@ def delete_environments_v2_identity_overrides_by_feature(feature_id: int) -> Non dynamodb_wrapper_v2.delete_identity_overrides( environment_id=environment.id, feature_id=feature_id ) + + +forward_identity_request = register_task_handler( + queue_size=2000, + priority=TaskPriority.LOW, +)(forward_identity_request_service) + +forward_trait_request = register_task_handler( + queue_size=2000, + priority=TaskPriority.LOW, +)(forward_trait_request_service) + +forward_trait_requests = register_task_handler( + queue_size=1000, + priority=TaskPriority.LOW, +)(forward_trait_requests_service) diff --git a/api/environments/identities/traits/views.py b/api/environments/identities/traits/views.py index 5b5c9e31034f..2265b2582450 100644 --- a/api/environments/identities/traits/views.py +++ b/api/environments/identities/traits/views.py @@ -12,7 +12,7 @@ from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response -from edge_api.identities.edge_request_forwarder import ( +from edge_api.identities.tasks import ( forward_trait_request, forward_trait_requests, ) diff --git a/api/environments/identities/views.py b/api/environments/identities/views.py index e99992377519..d59dae447452 100644 --- a/api/environments/identities/views.py +++ b/api/environments/identities/views.py @@ -18,7 +18,7 @@ from app.pagination import CustomPagination from core.constants import FLAGSMITH_UPDATED_AT_HEADER from core.request_origin import RequestOrigin -from edge_api.identities.edge_request_forwarder import forward_identity_request +from edge_api.identities.tasks import forward_identity_request from environments.identities.models import Identity from environments.identities.serializers import ( IdentitySerializer, diff --git a/api/features/apps.py b/api/features/apps.py index 6be4feeb21bd..819b65fec6c2 100644 --- a/api/features/apps.py +++ b/api/features/apps.py @@ -13,4 +13,3 @@ def ready(self): # type: ignore[no-untyped-def] # noinspection PyUnresolvedReferences import features.signals # noqa - import features.tasks # noqa diff --git a/api/features/feature_health/apps.py b/api/features/feature_health/apps.py index e2c7c7fa4088..6372813387ae 100644 --- a/api/features/feature_health/apps.py +++ b/api/features/feature_health/apps.py @@ -4,10 +4,3 @@ class FeatureHealthConfig(BaseAppConfig): name = "features.feature_health" default = True - - def ready(self): # type: ignore[no-untyped-def] - from features.feature_health.tasks import ( # noqa - update_feature_unhealthy_tag, - ) - - return super().ready() # type: ignore[no-untyped-call] diff --git a/api/features/tasks.py b/api/features/tasks.py index adfae9bfb677..25f77433c627 100644 --- a/api/features/tasks.py +++ b/api/features/tasks.py @@ -7,11 +7,11 @@ from environments.models import Webhook from features.models import Feature, FeatureState from webhooks.constants import WEBHOOK_DATETIME_FORMAT -from webhooks.webhooks import ( - WebhookEventType, +from webhooks.tasks import ( call_environment_webhooks, call_organisation_webhooks, ) +from webhooks.webhooks import WebhookEventType from .models import HistoricalFeatureState # type: ignore[attr-defined] diff --git a/api/organisations/chargebee/apps.py b/api/organisations/chargebee/apps.py new file mode 100644 index 000000000000..fdc242111d60 --- /dev/null +++ b/api/organisations/chargebee/apps.py @@ -0,0 +1,5 @@ +from django.apps import AppConfig + + +class ChargebeeAppConfig(AppConfig): + name = "organisations.chargebee" diff --git a/api/poetry.lock b/api/poetry.lock index e2fe363174da..77b91a509762 100644 --- a/api/poetry.lock +++ b/api/poetry.lock @@ -2007,8 +2007,8 @@ simplejson = "~3.19.1" [package.source] type = "git" url = "https://github.com/Flagsmith/flagsmith-task-processor" -reference = "v1.3.1" -resolved_reference = "fffd43bc92bfe477fbc1e4ab435610b6e23639d2" +reference = "v1.3.2" +resolved_reference = "366274d584e28b9833ee0937511baafc2c6ec71b" [[package]] name = "freezegun" @@ -5199,4 +5199,4 @@ files = [ [metadata] lock-version = "2.1" python-versions = ">3.11,<3.13" -content-hash = "3263dce1b5b4fdad2c7fcb52e98e4fec1f77bb44ae5edb7f430a84318fae6bef" +content-hash = "6acc9646db22a81fa144486ef9630ddb774b9c24678a786b6e0ef0bf6efcb0a6" diff --git a/api/pyproject.toml b/api/pyproject.toml index 6876943fc0a2..d958a52f5405 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -154,7 +154,7 @@ pygithub = "2.1.1" hubspot-api-client = "^8.2.1" djangorestframework-dataclasses = "^1.3.1" pyotp = "^2.9.0" -flagsmith-task-processor = { git = "https://github.com/Flagsmith/flagsmith-task-processor", tag = "v1.3.1" } +flagsmith-task-processor = { git = "https://github.com/Flagsmith/flagsmith-task-processor", tag = "v1.3.2" } flagsmith-common = { git = "https://github.com/Flagsmith/flagsmith-common", tag = "v1.4.2" } tzdata = "^2024.1" djangorestframework-simplejwt = "^5.3.1" diff --git a/api/segments/apps.py b/api/segments/apps.py index 06566725c244..559aa55f0885 100644 --- a/api/segments/apps.py +++ b/api/segments/apps.py @@ -4,8 +4,3 @@ class SegmentsConfig(BaseAppConfig): name = "segments" default = True - - def ready(self) -> None: - super().ready() # type: ignore[no-untyped-call] - - import segments.tasks # noqa diff --git a/api/tests/unit/edge_api/identities/test_edge_request_forwarder.py b/api/tests/unit/edge_api/identities/test_edge_request_forwarder.py index fcdc2ffbb0bf..7e6a535e5720 100644 --- a/api/tests/unit/edge_api/identities/test_edge_request_forwarder.py +++ b/api/tests/unit/edge_api/identities/test_edge_request_forwarder.py @@ -1,18 +1,18 @@ import json import pytest +from pytest_mock import MockerFixture from core.constants import FLAGSMITH_SIGNATURE_HEADER from edge_api.identities.edge_request_forwarder import ( forward_identity_request, forward_trait_request, - forward_trait_request_sync, forward_trait_requests, ) @pytest.mark.parametrize( - "forwarder_function", [forward_identity_request, forward_trait_request_sync] + "forwarder_function", [forward_identity_request, forward_trait_request] ) def test_forwarder_function_makes_no_request_if_migration_is_not_yet_done( # type: ignore[no-untyped-def] mocker, forwarder_mocked_requests, forwarder_mocked_migrator, forwarder_function @@ -118,7 +118,7 @@ def test_forward_trait_request_sync_makes_correct_post_request( # type: ignore[ ).is_migration_done = mocked_migration_done # When - forward_trait_request_sync("POST", headers, project_id, payload=request_data) + forward_trait_request("POST", headers, project_id, payload=request_data) # Then args, kwargs = forwarder_mocked_requests.post.call_args @@ -131,30 +131,10 @@ def test_forward_trait_request_sync_makes_correct_post_request( # type: ignore[ forwarder_mocked_migrator.assert_called_with(project_id) -def test_forward_trait_request_calls_sync_function_correctly(mocker): # type: ignore[no-untyped-def] +def test_forward_trait_requests__calls_expected(mocker: MockerFixture) -> None: # Given mocked_forward_trait_request = mocker.patch( - "edge_api.identities.edge_request_forwarder.forward_trait_request_sync", - autospec=True, - ) - request_method = "POST" - headers = {"X-Environment-Key": "test_api_key"} - project_id = 1 - payload = {"identity": {"identifier": "test_user_123"}} - - # When - forward_trait_request(request_method, headers, project_id, payload) - - # Then - mocked_forward_trait_request.assert_called_with( - request_method, headers, project_id, payload - ) - - -def test_forward_trait_requests_calls_sync_function_correctly(mocker): # type: ignore[no-untyped-def] - # Given - mocked_forward_trait_request = mocker.patch( - "edge_api.identities.edge_request_forwarder.forward_trait_request_sync", + "edge_api.identities.edge_request_forwarder.forward_trait_request", autospec=True, ) request_method = "POST" diff --git a/api/webhooks/apps.py b/api/webhooks/apps.py new file mode 100644 index 000000000000..3a356b5e2f5c --- /dev/null +++ b/api/webhooks/apps.py @@ -0,0 +1,5 @@ +from django.apps import AppConfig + + +class WebhooksAppConfig(AppConfig): + name = "webhooks" diff --git a/api/webhooks/tasks.py b/api/webhooks/tasks.py new file mode 100644 index 000000000000..be9ba300e40a --- /dev/null +++ b/api/webhooks/tasks.py @@ -0,0 +1,25 @@ +from task_processor.decorators import ( # type: ignore[import-untyped] + register_task_handler, +) + +from webhooks.webhooks import ( + call_environment_webhooks as call_environment_webhooks_service, +) +from webhooks.webhooks import ( + call_organisation_webhooks as call_organisation_webhooks_service, +) +from webhooks.webhooks import ( + call_webhook_with_failure_mail_after_retries as call_webhook_with_failure_mail_after_retries_service, +) + +call_environment_webhooks = register_task_handler()( + call_environment_webhooks_service, +) + +call_organisation_webhooks = register_task_handler()( + call_organisation_webhooks_service, +) + +call_webhook_with_failure_mail_after_retries = register_task_handler()( + call_webhook_with_failure_mail_after_retries_service, +) diff --git a/api/webhooks/webhooks.py b/api/webhooks/webhooks.py index a20a1ff9f378..70a98cced41f 100644 --- a/api/webhooks/webhooks.py +++ b/api/webhooks/webhooks.py @@ -11,9 +11,6 @@ from django.core.serializers.json import DjangoJSONEncoder from django.template.loader import get_template from django.utils import timezone -from task_processor.decorators import ( # type: ignore[import-untyped] - register_task_handler, -) from task_processor.task_run_method import TaskRunMethod # type: ignore[import-untyped] from core.constants import FLAGSMITH_SIGNATURE_HEADER @@ -63,7 +60,6 @@ def get_webhook_model( return Webhook -@register_task_handler() # type: ignore[misc] def call_environment_webhooks( # type: ignore[no-untyped-def] environment_id: int, data: typing.Mapping, # type: ignore[type-arg] @@ -94,7 +90,6 @@ def call_environment_webhooks( # type: ignore[no-untyped-def] ) -@register_task_handler() # type: ignore[misc] def call_organisation_webhooks( # type: ignore[no-untyped-def] organisation_id: int, data: typing.Mapping, # type: ignore[type-arg] @@ -169,7 +164,6 @@ def _call_webhook( raise -@register_task_handler() # type: ignore[misc] def call_webhook_with_failure_mail_after_retries( # type: ignore[no-untyped-def] webhook_id: int, data: typing.Mapping, # type: ignore[type-arg] @@ -219,6 +213,8 @@ def call_webhook_with_failure_mail_after_retries( # type: ignore[no-untyped-def f"{f'HTTP {exc.response.status_code}' if exc.response else 'N/A'} ({exc.__class__.__name__})", ) else: + from webhooks.tasks import call_webhook_with_failure_mail_after_retries + call_webhook_with_failure_mail_after_retries.delay( delay_until=( timezone.now() @@ -248,6 +244,8 @@ def _call_webhooks( # type: ignore[no-untyped-def] webhook_type: WebhookType, retries: int = settings.WEBHOOK_BACKOFF_RETRIES, ): + from webhooks.tasks import call_webhook_with_failure_mail_after_retries + webhook_data = {"event_type": event_type, "data": data} serializer = WebhookSerializer(data=webhook_data) serializer.is_valid(raise_exception=False)