-
Notifications
You must be signed in to change notification settings - Fork 429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(task-processor): Add priority support #2847
Changes from 7 commits
7afa715
37bcce3
8f1b786
445ece7
2e02a25
f7202ba
a7df921
f25c2ac
46a467a
1bfe023
fa5ea12
802a4f8
ad8b538
1b53f06
f951507
5887a05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# Generated by Django 3.2.20 on 2023-10-13 06:04 | ||
|
||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
('task_processor', '0009_add_recurring_task_run_first_run_at'), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name='task', | ||
name='priority', | ||
field=models.PositiveSmallIntegerField(choices=[(100, 'Lower'), (75, 'Low'), (50, 'Normal'), (25, 'High'), (0, 'Highest')], default=None, null=True), | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# Generated by Django 3.2.20 on 2023-10-13 04:44 | ||
|
||
from django.db import migrations | ||
|
||
from core.migration_helpers import PostgresOnlyRunSQL | ||
import os | ||
|
||
|
||
class Migration(migrations.Migration): | ||
dependencies = [ | ||
("task_processor", "0010_task_priority"), | ||
] | ||
|
||
operations = [ | ||
PostgresOnlyRunSQL.from_sql_file( | ||
os.path.join( | ||
os.path.dirname(__file__), | ||
"sql", | ||
"0011_get_tasks_to_process.sql", | ||
), | ||
reverse_sql="DROP FUNCTION IF EXISTS get_tasks_to_process", | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
CREATE OR REPLACE FUNCTION get_tasks_to_process(num_tasks integer) | ||
RETURNS SETOF task_processor_task AS $$ | ||
DECLARE | ||
row_to_return task_processor_task; | ||
BEGIN | ||
-- Select the tasks that needs to be processed | ||
FOR row_to_return IN | ||
SELECT * | ||
FROM task_processor_task | ||
WHERE num_failures < 3 AND scheduled_for < NOW() AND completed = FALSE AND is_locked = FALSE | ||
ORDER BY priority ASC, scheduled_for ASC, created_at ASC | ||
gagantrivedi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
LIMIT num_tasks | ||
-- Select for update to ensure that no other workers can select these tasks while in this transaction block | ||
FOR UPDATE SKIP LOCKED | ||
LOOP | ||
-- Lock every selected task(by updating `is_locked` to true) | ||
UPDATE task_processor_task | ||
-- Lock this row by setting is_locked True, so that no other workers can select these tasks after this | ||
-- transaction is complete (but the tasks are still being executed by the current worker) | ||
SET is_locked = TRUE | ||
WHERE id = row_to_return.id; | ||
-- If we don't explicitly update the `is_locked` column here, the client will receive the row that is actually locked but has the `is_locked` value set to `False`. | ||
row_to_return.is_locked := TRUE; | ||
RETURN NEXT row_to_return; | ||
END LOOP; | ||
|
||
RETURN; | ||
END; | ||
$$ LANGUAGE plpgsql | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,14 @@ | |
from task_processor.task_registry import registered_tasks | ||
|
||
|
||
class TaskPriority(models.IntegerChoices): | ||
LOWER = 100 | ||
LOW = 75 | ||
NORMAL = 50 | ||
HIGH = 25 | ||
HIGHEST = 0 | ||
|
||
|
||
class AbstractBaseTask(models.Model): | ||
uuid = models.UUIDField(unique=True, default=uuid.uuid4) | ||
created_at = models.DateTimeField(auto_now_add=True) | ||
|
@@ -74,6 +82,9 @@ class Task(AbstractBaseTask): | |
num_failures = models.IntegerField(default=0) | ||
completed = models.BooleanField(default=False) | ||
objects = TaskManager() | ||
priority = models.PositiveSmallIntegerField( | ||
default=None, null=True, choices=TaskPriority.choices | ||
) | ||
|
||
class Meta: | ||
# We have customised the migration in 0004 to only apply this change to postgres databases | ||
|
@@ -90,12 +101,14 @@ class Meta: | |
def create( | ||
cls, | ||
task_identifier: str, | ||
priority: TaskPriority = TaskPriority.NORMAL, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @matthewelwell think we can get rid of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can, but I quite like having the helper method to differentiate them. What's the benefit in removing it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the name is slightly odd, no? I'd expect it to schedule the task(i.e: put on some queue?) but it only creates the class instance. Does that make sense? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, you mean that it doesn't actually persist it to the DB? But that's the same as the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, exactly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've removed schedule_task method, but now I am not sure if it looks any better 😕 ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, there's less code so that's always a good thing. I think I would prefer to keep the logic regarding the queue in the Task model though. Can we just handle that in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, done |
||
*, | ||
args: typing.Tuple[typing.Any] = None, | ||
kwargs: typing.Dict[str, typing.Any] = None, | ||
) -> "Task": | ||
return Task( | ||
task_identifier=task_identifier, | ||
priority=priority, | ||
serialized_args=cls.serialize_data(args or tuple()), | ||
serialized_kwargs=cls.serialize_data(kwargs or dict()), | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be highest? I think we basically want to reserve highest for those tasks involved in creating the environment document, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think this is as import as environment document, right? We want the identity overrides to reflect as soon as possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say it's slightly lower priority than environment document changes.