Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Some tasks are not being initialised #5213

Merged
merged 7 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
"projects",
"sales_dashboard",
"edge_api",
"edge_api.identities",
"environments",
"environments.permissions",
"environments.identities",
Expand Down
7 changes: 5 additions & 2 deletions api/app_analytics/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
from django.conf import settings
from django.utils import timezone

from app_analytics.tasks import track_feature_evaluation, track_request
from app_analytics.track import track_feature_evaluation_influxdb
from app_analytics.tasks import (
track_feature_evaluation,
track_feature_evaluation_influxdb,
track_request,
)


class APIUsageCache:
Expand Down
20 changes: 17 additions & 3 deletions api/app_analytics/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@
)

from app_analytics.constants import ANALYTICS_READ_BUCKET_SIZE
from environments.models import Environment

from .models import (
from app_analytics.models import (
APIUsageBucket,
APIUsageRaw,
FeatureEvaluationBucket,
FeatureEvaluationRaw,
)
from app_analytics.track import (
track_feature_evaluation_influxdb as track_feature_evaluation_influxdb_service,
)
from app_analytics.track import (
track_feature_evaluation_influxdb_v2 as track_feature_evaluation_influxdb_v2_service,
)
from environments.models import Environment

if settings.USE_POSTGRES_FOR_ANALYTICS:

Expand Down Expand Up @@ -111,6 +116,15 @@ def track_request(resource: int, host: str, environment_key: str, count: int = 1
)


track_feature_evaluation_influxdb = register_task_handler()(
track_feature_evaluation_influxdb_service
)

track_feature_evaluation_influxdb_v2 = register_task_handler()(
track_feature_evaluation_influxdb_v2_service
)


def get_start_of_current_bucket(bucket_size: int) -> datetime:
if bucket_size > 60:
raise ValueError("Bucket size cannot be greater than 60 minutes")
Expand Down
5 changes: 0 additions & 5 deletions api/app_analytics/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
from six.moves.urllib.parse import ( # type: ignore[import-untyped]
quote, # python 2/3 compatible urllib import
)
from task_processor.decorators import ( # type: ignore[import-untyped]
register_task_handler,
)

from app_analytics.influxdb_wrapper import InfluxDBWrapper
from environments.models import Environment
Expand Down Expand Up @@ -130,7 +127,6 @@ def track_request_influxdb(request): # type: ignore[no-untyped-def]
influxdb.write() # type: ignore[no-untyped-call]


@register_task_handler() # type: ignore[misc]
def track_feature_evaluation_influxdb(
environment_id: int, feature_evaluations: dict[str, int]
) -> None:
Expand All @@ -149,7 +145,6 @@ def track_feature_evaluation_influxdb(
influxdb.write() # type: ignore[no-untyped-call]


@register_task_handler() # type: ignore[misc]
def track_feature_evaluation_influxdb_v2(
environment_id: int, feature_evaluations: list[dict[str, int | str | bool]]
) -> None:
Expand Down
6 changes: 4 additions & 2 deletions api/app_analytics/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
get_usage_data,
)
from app_analytics.cache import FeatureEvaluationCache
from app_analytics.tasks import track_feature_evaluation_v2
from app_analytics.track import track_feature_evaluation_influxdb_v2
from app_analytics.tasks import (
track_feature_evaluation_influxdb_v2,
track_feature_evaluation_v2,
)
from environments.authentication import EnvironmentKeyAuthentication
from environments.permissions.permissions import EnvironmentKeyPermissions
from features.models import FeatureState
Expand Down
1 change: 0 additions & 1 deletion api/custom_auth/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ class CustomAuthAppConfig(AppConfig):
name = "custom_auth"

def ready(self) -> None:
from custom_auth import tasks # noqa F401
from custom_auth.jwt_cookie import signals # noqa F401
4 changes: 1 addition & 3 deletions api/edge_api/identities/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,4 @@

class EdgeAPIIdentitiesAppConfig(AppConfig):
name = "edge_api.identities"

def ready(self): # type: ignore[no-untyped-def]
from . import tasks # noqa
label = "edge_api_identities"
37 changes: 11 additions & 26 deletions api/edge_api/identities/edge_request_forwarder.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
import json
from typing import Any

import requests
from django.conf import settings
from task_processor.decorators import ( # type: ignore[import-untyped]
register_task_handler,
)
from task_processor.models import TaskPriority # type: ignore[import-untyped]

from core.constants import FLAGSMITH_SIGNATURE_HEADER
from core.signing import sign_payload
from environments.dynamodb.migrator import IdentityMigrator


def _should_forward(project_id: int) -> bool:
migrator = IdentityMigrator(project_id) # type: ignore[no-untyped-call]
return bool(migrator.is_migration_done)


@register_task_handler(queue_size=2000, priority=TaskPriority.LOW) # type: ignore[misc]
def forward_identity_request( # type: ignore[no-untyped-def]
request_method: str,
headers: dict, # type: ignore[type-arg]
Expand All @@ -38,21 +29,11 @@ def forward_identity_request( # type: ignore[no-untyped-def]
requests.get(url, params=query_params, headers=headers, timeout=5)


@register_task_handler(queue_size=2000, priority=TaskPriority.LOW) # type: ignore[misc]
def forward_trait_request( # type: ignore[no-untyped-def]
request_method: str,
headers: dict, # type: ignore[type-arg]
headers: dict[str, str],
project_id: int,
payload: dict, # type: ignore[type-arg]
):
return forward_trait_request_sync(request_method, headers, project_id, payload)


def forward_trait_request_sync( # type: ignore[no-untyped-def]
request_method: str,
headers: dict, # type: ignore[type-arg]
project_id: int,
payload: dict, # type: ignore[type-arg]
payload: dict[str, Any],
):
if not _should_forward(project_id):
return
Expand All @@ -67,15 +48,19 @@ def forward_trait_request_sync( # type: ignore[no-untyped-def]
)


@register_task_handler(queue_size=1000, priority=TaskPriority.LOW) # type: ignore[misc]
def forward_trait_requests( # type: ignore[no-untyped-def]
request_method: str,
headers: str,
headers: dict[str, str],
project_id: int,
payload: dict, # type: ignore[type-arg]
payload: list[dict[str, Any]],
):
for trait_data in payload:
forward_trait_request_sync(request_method, headers, project_id, trait_data) # type: ignore[arg-type]
forward_trait_request(request_method, headers, project_id, trait_data)


def _should_forward(project_id: int) -> bool:
migrator = IdentityMigrator(project_id) # type: ignore[no-untyped-call]
return bool(migrator.is_migration_done)


def _get_headers(request_method: str, headers: dict, payload: str = "") -> dict: # type: ignore[type-arg]
Expand Down
25 changes: 25 additions & 0 deletions api/edge_api/identities/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@

from audit.models import AuditLog
from audit.related_object_type import RelatedObjectType
from edge_api.identities.edge_request_forwarder import (
forward_identity_request as forward_identity_request_service,
)
from edge_api.identities.edge_request_forwarder import (
forward_trait_request as forward_trait_request_service,
)
from edge_api.identities.edge_request_forwarder import (
forward_trait_requests as forward_trait_requests_service,
)
from edge_api.identities.types import IdentityChangeset
from environments.dynamodb import DynamoEnvironmentV2Wrapper
from environments.models import Environment, Webhook
Expand Down Expand Up @@ -172,3 +181,19 @@ def delete_environments_v2_identity_overrides_by_feature(feature_id: int) -> Non
dynamodb_wrapper_v2.delete_identity_overrides(
environment_id=environment.id, feature_id=feature_id
)


forward_identity_request = register_task_handler(
queue_size=2000,
priority=TaskPriority.LOW,
)(forward_identity_request_service)

forward_trait_request = register_task_handler(
queue_size=2000,
priority=TaskPriority.LOW,
)(forward_trait_request_service)

forward_trait_requests = register_task_handler(
queue_size=1000,
priority=TaskPriority.LOW,
)(forward_trait_requests_service)
2 changes: 1 addition & 1 deletion api/environments/identities/traits/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from edge_api.identities.edge_request_forwarder import (
from edge_api.identities.tasks import (
forward_trait_request,
forward_trait_requests,
)
Expand Down
2 changes: 1 addition & 1 deletion api/environments/identities/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from app.pagination import CustomPagination
from core.constants import FLAGSMITH_UPDATED_AT_HEADER
from core.request_origin import RequestOrigin
from edge_api.identities.edge_request_forwarder import forward_identity_request
from edge_api.identities.tasks import forward_identity_request
from environments.identities.models import Identity
from environments.identities.serializers import (
IdentitySerializer,
Expand Down
1 change: 0 additions & 1 deletion api/features/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ def ready(self): # type: ignore[no-untyped-def]

# noinspection PyUnresolvedReferences
import features.signals # noqa
import features.tasks # noqa
7 changes: 0 additions & 7 deletions api/features/feature_health/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,3 @@
class FeatureHealthConfig(BaseAppConfig):
name = "features.feature_health"
default = True

def ready(self): # type: ignore[no-untyped-def]
from features.feature_health.tasks import ( # noqa
update_feature_unhealthy_tag,
)

return super().ready() # type: ignore[no-untyped-call]
6 changes: 3 additions & 3 deletions api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pygithub = "2.1.1"
hubspot-api-client = "^8.2.1"
djangorestframework-dataclasses = "^1.3.1"
pyotp = "^2.9.0"
flagsmith-task-processor = { git = "https://github.com/Flagsmith/flagsmith-task-processor", tag = "v1.3.1" }
flagsmith-task-processor = { git = "https://github.com/Flagsmith/flagsmith-task-processor", tag = "v1.3.2" }
flagsmith-common = { git = "https://github.com/Flagsmith/flagsmith-common", tag = "v1.4.2" }
tzdata = "^2024.1"
djangorestframework-simplejwt = "^5.3.1"
Expand Down
5 changes: 0 additions & 5 deletions api/segments/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,3 @@
class SegmentsConfig(BaseAppConfig):
name = "segments"
default = True

def ready(self) -> None:
super().ready() # type: ignore[no-untyped-call]

import segments.tasks # noqa
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
from edge_api.identities.edge_request_forwarder import (
forward_identity_request,
forward_trait_request,
forward_trait_request_sync,
forward_trait_requests,
)


@pytest.mark.parametrize(
"forwarder_function", [forward_identity_request, forward_trait_request_sync]
"forwarder_function", [forward_identity_request, forward_trait_request]
)
def test_forwarder_function_makes_no_request_if_migration_is_not_yet_done( # type: ignore[no-untyped-def]
mocker, forwarder_mocked_requests, forwarder_mocked_migrator, forwarder_function
Expand Down Expand Up @@ -118,7 +117,7 @@ def test_forward_trait_request_sync_makes_correct_post_request( # type: ignore[
).is_migration_done = mocked_migration_done

# When
forward_trait_request_sync("POST", headers, project_id, payload=request_data)
forward_trait_request("POST", headers, project_id, payload=request_data)

# Then
args, kwargs = forwarder_mocked_requests.post.call_args
Expand Down
Loading