From 9f83f27481d31147173f772728c3e31116ba4548 Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Fri, 22 Dec 2023 15:46:34 +0530 Subject: [PATCH] feat(tasks-processor): Add recurring task to clean up old recurring task runs (#3151) --- api/app/settings/common.py | 3 + api/task_processor/tasks.py | 16 +++- .../test_unit_task_processor_tasks.py | 83 ++++++++++++++++++- 3 files changed, 99 insertions(+), 3 deletions(-) diff --git a/api/app/settings/common.py b/api/app/settings/common.py index 96d560285b66..25a046e304fe 100644 --- a/api/app/settings/common.py +++ b/api/app/settings/common.py @@ -909,6 +909,9 @@ ) TASK_DELETE_RUN_TIME = env.time("TASK_DELETE_RUN_TIME", default="01:00") TASK_DELETE_RUN_EVERY = env.timedelta("TASK_DELETE_RUN_EVERY", default=86400) +RECURRING_TASK_RUN_RETENTION_DAYS = env.int( + "RECURRING_TASK_RUN_RETENTION_DAYS", default=30 +) # Real time(server sent events) settings SSE_SERVER_BASE_URL = env.str("SSE_SERVER_BASE_URL", None) diff --git a/api/task_processor/tasks.py b/api/task_processor/tasks.py index 78c927a00d43..80de37e17ac2 100644 --- a/api/task_processor/tasks.py +++ b/api/task_processor/tasks.py @@ -9,7 +9,7 @@ register_recurring_task, register_task_handler, ) -from task_processor.models import HealthCheckModel, Task +from task_processor.models import HealthCheckModel, RecurringTaskRun, Task logger = logging.getLogger(__name__) @@ -50,3 +50,17 @@ def clean_up_old_tasks(): 0 : settings.TASK_DELETE_BATCH_SIZE # noqa:E203 ] ).delete() + + +@register_recurring_task( + run_every=settings.TASK_DELETE_RUN_EVERY, + first_run_time=settings.TASK_DELETE_RUN_TIME, +) +def clean_up_old_recurring_task_runs(): + if not settings.ENABLE_CLEAN_UP_OLD_TASKS: + return + + now = timezone.now() + delete_before = now - timedelta(days=settings.RECURRING_TASK_RUN_RETENTION_DAYS) + + RecurringTaskRun.objects.filter(finished_at__lt=delete_before).delete() diff --git a/api/tests/unit/task_processor/test_unit_task_processor_tasks.py b/api/tests/unit/task_processor/test_unit_task_processor_tasks.py index 09acf2a45933..c9c3757401bf 100644 --- a/api/tests/unit/task_processor/test_unit_task_processor_tasks.py +++ b/api/tests/unit/task_processor/test_unit_task_processor_tasks.py @@ -1,9 +1,14 @@ from datetime import timedelta +from typing import Callable from django.utils import timezone +from pytest_django.fixtures import SettingsWrapper -from task_processor.models import Task -from task_processor.tasks import clean_up_old_tasks +from task_processor.models import RecurringTask, RecurringTaskRun, Task +from task_processor.tasks import ( + clean_up_old_recurring_task_runs, + clean_up_old_tasks, +) now = timezone.now() three_days_ago = now - timedelta(days=3) @@ -23,6 +28,17 @@ def test_clean_up_old_tasks_does_nothing_when_no_tasks(db): assert Task.objects.count() == 0 +def test_clean_up_old_recurring_task_runs_does_nothing_when_no_runs(db: None) -> None: + # Given + assert RecurringTaskRun.objects.count() == 0 + + # When + clean_up_old_recurring_task_runs() + + # Then + assert RecurringTaskRun.objects.count() == 0 + + def test_clean_up_old_tasks(settings, django_assert_num_queries, db): # Given settings.TASK_DELETE_RETENTION_DAYS = 2 @@ -71,6 +87,42 @@ def test_clean_up_old_tasks(settings, django_assert_num_queries, db): ] +def test_clean_up_old_recurring_task_runs( + settings: SettingsWrapper, + django_assert_num_queries: Callable[[int], None], + db: None, +) -> None: + # Given + settings.RECURRING_TASK_RUN_RETENTION_DAYS = 2 + settings.ENABLE_CLEAN_UP_OLD_TASKS = True + + recurring_task = RecurringTask.objects.create( + task_identifier="some_identifier", run_every=timedelta(seconds=1) + ) + + # 2 task runs finished before retention period + for _ in range(2): + RecurringTaskRun.objects.create( + started_at=three_days_ago, + task=recurring_task, + finished_at=three_days_ago, + ) + + # a task run that is within the retention period + task_in_retention_period = RecurringTaskRun.objects.create( + task=recurring_task, + started_at=one_day_ago, + finished_at=one_day_ago, + ) + + # When + with django_assert_num_queries(1): + clean_up_old_recurring_task_runs() + + # Then + assert list(RecurringTaskRun.objects.all()) == [task_in_retention_period] + + def test_clean_up_old_tasks_include_failed_tasks( settings, django_assert_num_queries, db ): @@ -106,3 +158,30 @@ def test_clean_up_old_tasks_does_not_run_if_disabled( # Then assert Task.objects.filter(id=task.id).exists() + + +def test_clean_up_old_recurring_task_runs_does_not_run_if_disabled( + settings: SettingsWrapper, + django_assert_num_queries: Callable[[int], None], + db: None, +) -> None: + # Given + settings.RECURRING_TASK_RUN_RETENTION_DAYS = 2 + settings.ENABLE_CLEAN_UP_OLD_TASKS = False + + recurring_task = RecurringTask.objects.create( + task_identifier="some_identifier", run_every=timedelta(seconds=1) + ) + + RecurringTaskRun.objects.create( + started_at=three_days_ago, + task=recurring_task, + finished_at=three_days_ago, + ) + + # When + with django_assert_num_queries(0): + clean_up_old_recurring_task_runs() + + # Then + assert RecurringTaskRun.objects.exists()