diff --git a/api/features/import_export/serializers.py b/api/features/import_export/serializers.py index 4645c8d8c75f..a51cf24785d9 100644 --- a/api/features/import_export/serializers.py +++ b/api/features/import_export/serializers.py @@ -66,6 +66,14 @@ class FeatureImportUploadSerializer(serializers.Serializer): strategy = serializers.ChoiceField(choices=[SKIP, OVERWRITE_DESTRUCTIVE]) def save(self, environment_id: int) -> FeatureImport: + if FeatureImport.objects.filter( + environment_id=environment_id, + status=PROCESSING, + ).exists(): + raise ValidationError( + "Can't import features, since already processing a feature import." + ) + uploaded_file = self.validated_data["file"] strategy = self.validated_data["strategy"] file_content = uploaded_file.read().decode("utf-8") diff --git a/api/features/import_export/tasks.py b/api/features/import_export/tasks.py index c93a900db994..f313dfd1718b 100644 --- a/api/features/import_export/tasks.py +++ b/api/features/import_export/tasks.py @@ -34,6 +34,31 @@ def clear_stale_feature_imports_and_exports() -> None: FeatureImport.objects.filter(created_at__lt=two_weeks_ago).delete() +@register_recurring_task( + run_every=timedelta(minutes=10), +) +def retire_stalled_feature_imports_and_exports() -> None: + ten_minutes_ago = timezone.now() - timedelta(minutes=10) + + feature_exports = [] + for feature_export in FeatureExport.objects.filter( + created_at__lt=ten_minutes_ago, + status=PROCESSING, + ): + feature_export.status = FAILED + feature_exports.append(feature_export) + FeatureExport.objects.bulk_update(feature_exports, ["status"]) + + feature_imports = [] + for feature_import in FeatureImport.objects.filter( + created_at__lt=ten_minutes_ago, + status=PROCESSING, + ): + feature_import.status = FAILED + feature_imports.append(feature_import) + FeatureImport.objects.bulk_update(feature_imports, ["status"]) + + def _export_features_for_environment( feature_export: FeatureExport, tag_ids: Optional[list[int]] ) -> None: diff --git a/api/tests/unit/features/import_export/test_unit_features_import_export_tasks.py b/api/tests/unit/features/import_export/test_unit_features_import_export_tasks.py index a86aa0d93906..ffca424897e2 100644 --- a/api/tests/unit/features/import_export/test_unit_features_import_export_tasks.py +++ b/api/tests/unit/features/import_export/test_unit_features_import_export_tasks.py @@ -10,6 +10,7 @@ from environments.models import Environment from features.feature_types import MULTIVARIATE, STANDARD from features.import_export.constants import ( + FAILED, OVERWRITE_DESTRUCTIVE, PROCESSING, SKIP, @@ -25,6 +26,7 @@ clear_stale_feature_imports_and_exports, export_features_for_environment, import_features_for_environment, + retire_stalled_feature_imports_and_exports, ) from features.models import Feature, FeatureSegment, FeatureState from features.multivariate.models import MultivariateFeatureOption @@ -72,6 +74,50 @@ def test_clear_stale_feature_imports_and_exports( kept_feature_export.refresh_from_db() +def test_retire_stalled_feature_imports_and_exports( + db: None, environment: Environment, freezer: FrozenDateTimeFactory +): + # Given + now = timezone.now() + freezer.move_to(now - timedelta(minutes=12)) + to_fail_feature_export = FeatureExport.objects.create( + data="{}", + environment=environment, + status=PROCESSING, + ) + to_fail_feature_import = FeatureImport.objects.create( + data="{}", + environment=environment, + status=PROCESSING, + ) + + freezer.move_to(now) + keep_processing_feature_export = FeatureExport.objects.create( + data="{}", + environment=environment, + status=PROCESSING, + ) + keep_processing_feature_import = FeatureImport.objects.create( + data="{}", + environment=environment, + status=PROCESSING, + ) + + # When + retire_stalled_feature_imports_and_exports() + + # Then + to_fail_feature_import.refresh_from_db() + to_fail_feature_export.refresh_from_db() + keep_processing_feature_import.refresh_from_db() + keep_processing_feature_export.refresh_from_db() + + assert to_fail_feature_import.status == FAILED + assert to_fail_feature_export.status == FAILED + assert keep_processing_feature_import.status == PROCESSING + assert keep_processing_feature_export.status == PROCESSING + + def test_export_and_import_features_for_environment_with_skip( db: None, environment: Environment, diff --git a/api/tests/unit/features/import_export/test_unit_features_import_export_views.py b/api/tests/unit/features/import_export/test_unit_features_import_export_views.py index 123c72809d71..7c45edbd4580 100644 --- a/api/tests/unit/features/import_export/test_unit_features_import_export_views.py +++ b/api/tests/unit/features/import_export/test_unit_features_import_export_views.py @@ -191,6 +191,40 @@ def test_feature_import( assert feature_import.strategy == OVERWRITE_DESTRUCTIVE +def test_feature_import_already_processing( + admin_client: APIClient, + environment: Environment, +) -> None: + # Given + assert FeatureImport.objects.count() == 0 + + # Create a FeatureImport that's processing already. + FeatureImport.objects.create( + environment=environment, + strategy=OVERWRITE_DESTRUCTIVE, + status=PROCESSING, + data="{}", + ) + + url = reverse( + "api-v1:features:feature-import", + args=[environment.id], + ) + + file_data = b"[]" + uploaded_file = SimpleUploadedFile("test.23.json", file_data) + data = {"file": uploaded_file, "strategy": OVERWRITE_DESTRUCTIVE} + + # When + response = admin_client.post(url, data=data, format="multipart") + + # Then + assert response.status_code == 400 + assert response.json() == [ + "Can't import features, since already processing a feature import." + ] + + def test_feature_import_unauthorized( staff_client: APIClient, environment: Environment,