From 00f055297b55f3dfdceb8bd35ec22e1d50bfadfd Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Thu, 21 Dec 2023 09:17:29 +0530 Subject: [PATCH] fix(task-processor): implement grace period for deleting old recurring task (#3169) --- api/task_processor/processor.py | 12 ++++- .../test_unit_task_processor_processor.py | 47 +++++++++++++++++-- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/api/task_processor/processor.py b/api/task_processor/processor.py index a5f6e73afd7b..a920be53cb06 100644 --- a/api/task_processor/processor.py +++ b/api/task_processor/processor.py @@ -1,6 +1,7 @@ import logging import traceback import typing +from datetime import timedelta from django.utils import timezone @@ -14,6 +15,8 @@ logger = logging.getLogger(__name__) +UNREGISTERED_RECURRING_TASK_GRACE_PERIOD = timedelta(minutes=30) + def run_tasks(num_tasks: int = 1) -> typing.List[TaskRun]: if num_tasks < 1: @@ -57,9 +60,14 @@ def run_recurring_tasks(num_tasks: int = 1) -> typing.List[RecurringTaskRun]: task_runs = [] for task in tasks: - # Remove the task if it's not registered anymore if not task.is_task_registered: - task.delete() + # This is necessary to ensure that old instances of the task processor, + # which may still be running during deployment, do not remove tasks added by new instances. + # Reference: https://github.com/Flagsmith/flagsmith/issues/2551 + if ( + timezone.now() - task.created_at + ) > UNREGISTERED_RECURRING_TASK_GRACE_PERIOD: + task.delete() continue if task.should_execute: diff --git a/api/tests/unit/task_processor/test_unit_task_processor_processor.py b/api/tests/unit/task_processor/test_unit_task_processor_processor.py index e954956f4651..749c63b1a90a 100644 --- a/api/tests/unit/task_processor/test_unit_task_processor_processor.py +++ b/api/tests/unit/task_processor/test_unit_task_processor_processor.py @@ -1,3 +1,4 @@ +import logging import time import uuid from datetime import timedelta @@ -5,6 +6,7 @@ import pytest from django.utils import timezone +from freezegun import freeze_time from organisations.models import Organisation from task_processor.decorators import ( @@ -19,7 +21,11 @@ TaskResult, TaskRun, ) -from task_processor.processor import run_recurring_tasks, run_tasks +from task_processor.processor import ( + UNREGISTERED_RECURRING_TASK_GRACE_PERIOD, + run_recurring_tasks, + run_tasks, +) from task_processor.task_registry import registered_tasks @@ -150,10 +156,13 @@ def _create_organisation(organisation_name): assert RecurringTaskRun.objects.filter(task=task).count() == 1 -def test_run_recurring_tasks_deletes_the_task_if_it_is_not_registered( - db, run_by_processor -): +def test_run_recurring_tasks_does_nothing_if_unregistered_task_is_new( + db: None, run_by_processor: None, caplog: pytest.LogCaptureFixture +) -> None: # Given + task_processor_logger = logging.getLogger("task_processor") + task_processor_logger.propagate = True + task_identifier = "test_unit_task_processor_processor._a_task" @register_recurring_task(run_every=timedelta(milliseconds=100)) @@ -168,7 +177,35 @@ def _a_task(): # Then assert len(task_runs) == 0 - assert not RecurringTask.objects.filter(task_identifier=task_identifier).exists() + assert RecurringTask.objects.filter(task_identifier=task_identifier).exists() + + +def test_run_recurring_tasks_deletes_the_task_if_unregistered_task_is_old( + db: None, run_by_processor: None, caplog: pytest.LogCaptureFixture +) -> None: + # Given + task_processor_logger = logging.getLogger("task_processor") + task_processor_logger.propagate = True + + task_identifier = "test_unit_task_processor_processor._a_task" + + with freeze_time(timezone.now() - UNREGISTERED_RECURRING_TASK_GRACE_PERIOD): + + @register_recurring_task(run_every=timedelta(milliseconds=100)) + def _a_task(): + pass + + # now - remove the task from the registry + registered_tasks.pop(task_identifier) + + # When + task_runs = run_recurring_tasks() + + # Then + assert len(task_runs) == 0 + assert ( + RecurringTask.objects.filter(task_identifier=task_identifier).exists() is False + ) def test_run_task_runs_task_and_creates_task_run_object_when_failure(db):