Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build improvements #569

Merged
merged 10 commits into from
May 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions MANIFEST.in

This file was deleted.

2 changes: 1 addition & 1 deletion django_q/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION = (1, 3, 6)
VERSION = (1, 3, 7)

default_app_config = "django_q.apps.DjangoQConfig"

Expand Down
8 changes: 4 additions & 4 deletions django_q/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django.utils.translation import gettext_lazy as _

from django_q.conf import Conf, croniter
from django_q.models import Success, Failure, Schedule, OrmQ
from django_q.models import Failure, OrmQ, Schedule, Success
from django_q.tasks import async_task


Expand Down Expand Up @@ -60,7 +60,7 @@ def get_readonly_fields(self, request, obj=None):


class ScheduleAdmin(admin.ModelAdmin):
""" model admin for schedules """
"""model admin for schedules"""

list_display = (
"id",
Expand All @@ -84,7 +84,7 @@ class ScheduleAdmin(admin.ModelAdmin):


class QueueAdmin(admin.ModelAdmin):
""" queue admin for ORM broker """
"""queue admin for ORM broker"""

list_display = ("id", "key", "task_id", "name", "func", "lock")

Expand All @@ -100,7 +100,7 @@ def get_queryset(self, request):
def has_add_permission(self, request):
"""Don't allow adds."""
return False

list_filter = ("key",)


Expand Down
2 changes: 1 addition & 1 deletion django_q/brokers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib
from typing import Optional

from django.core.cache import caches, InvalidCacheBackendError
from django.core.cache import InvalidCacheBackendError, caches

from django_q.conf import Conf

Expand Down
6 changes: 4 additions & 2 deletions django_q/brokers/aws_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ def dequeue(self):
if not isinstance(wait_time_second, int):
raise ValueError("receive_message_wait_time_seconds should be int")
if wait_time_second > 20:
raise ValueError("receive_message_wait_time_seconds is invalid. Reason: Must be >= 0 and <= 20")
raise ValueError(
"receive_message_wait_time_seconds is invalid. Reason: Must be >= 0 and <= 20"
)
params.update({"WaitTimeSeconds": wait_time_second})

tasks = self.queue.receive_messages(**params)
Expand Down Expand Up @@ -80,7 +82,7 @@ def get_connection(list_key: str = Conf.PREFIX) -> Session:
config["region_name"] = config["aws_region"]
del config["aws_region"]

if 'receive_message_wait_time_seconds' in config:
if "receive_message_wait_time_seconds" in config:
del config["receive_message_wait_time_seconds"]

return Session(**config)
Expand Down
3 changes: 2 additions & 1 deletion django_q/brokers/disque.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

# External
import redis
from redis import Redis

# Django
from django.utils.translation import gettext_lazy as _
from redis import Redis

from django_q.brokers import Broker
from django_q.conf import Conf

Expand Down
46 changes: 30 additions & 16 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@
import socket
import traceback
import uuid
from datetime import datetime
from multiprocessing import Event, Process, Value, current_process
from time import sleep

# External
import arrow

# Django
from django import db, core
from django import core, db
from django.apps.registry import apps

try:
apps.check_apps_ready()
except core.exceptions.AppRegistryNotReady:
import django

django.setup()

from django.conf import settings
Expand All @@ -28,21 +30,21 @@

# Local
import django_q.tasks
from django_q.brokers import get_broker, Broker
from django_q.brokers import Broker, get_broker
from django_q.conf import (
Conf,
croniter,
error_reporter,
get_ppid,
logger,
psutil,
get_ppid,
error_reporter,
croniter,
resource,
)
from django_q.humanhash import humanize
from django_q.models import Task, Success, Schedule
from django_q.models import Schedule, Success, Task
from django_q.queues import Queue
from django_q.signals import pre_execute
from django_q.signing import SignedPackage, BadSignature
from django_q.signing import BadSignature, SignedPackage
from django_q.status import Stat, Status


Expand Down Expand Up @@ -485,8 +487,11 @@ def save_task(task, broker: Broker):
existing_task.attempt_count = existing_task.attempt_count + 1
existing_task.save()

if Conf.MAX_ATTEMPTS > 0 and existing_task.attempt_count >= Conf.MAX_ATTEMPTS:
broker.acknowledge(task['ack_id'])
if (
Conf.MAX_ATTEMPTS > 0
and existing_task.attempt_count >= Conf.MAX_ATTEMPTS
):
broker.acknowledge(task["ack_id"])

else:
func = task["func"]
Expand All @@ -495,8 +500,8 @@ def save_task(task, broker: Broker):
func = f"{func.__module__}.{func.__name__}"
elif inspect.ismethod(func):
func = (
f'{func.__self__.__module__}.'
f'{func.__self__.__name__}.{func.__name__}'
f"{func.__self__.__module__}."
f"{func.__self__.__name__}.{func.__name__}"
)
Task.objects.create(
id=task["id"],
Expand All @@ -510,7 +515,7 @@ def save_task(task, broker: Broker):
result=task["result"],
group=task.get("group"),
success=task["success"],
attempt_count=1
attempt_count=1,
)
except Exception as e:
logger.error(e)
Expand Down Expand Up @@ -582,7 +587,9 @@ def scheduler(broker: Broker = None):
Schedule.objects.select_for_update()
.exclude(repeats=0)
.filter(next_run__lt=timezone.now())
.filter(db.models.Q(cluster__isnull=True) | db.models.Q(cluster=Conf.PREFIX))
.filter(
db.models.Q(cluster__isnull=True) | db.models.Q(cluster=Conf.PREFIX)
)
):
args = ()
kwargs = {}
Expand Down Expand Up @@ -627,7 +634,7 @@ def scheduler(broker: Broker = None):
)
)
next_run = arrow.get(
croniter(s.cron, timezone.localtime()).get_next()
croniter(s.cron, localtime()).get_next()
)
if Conf.CATCH_UP or next_run > arrow.utcnow():
break
Expand All @@ -643,7 +650,7 @@ def scheduler(broker: Broker = None):
scheduled_broker = broker
try:
scheduled_broker = get_broker(q_options["broker_name"])
except: # invalid broker_name or non existing broker with broker_name
except: # invalid broker_name or non existing broker with broker_name
pass
q_options["broker"] = scheduled_broker
q_options["group"] = q_options.get("group", s.name or s.id)
Expand Down Expand Up @@ -734,4 +741,11 @@ def rss_check():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss >= Conf.MAX_RSS
elif psutil:
return psutil.Process().memory_info().rss >= Conf.MAX_RSS * 1024
return False
return False


def localtime() -> datetime:
"""" Override for timezone.localtime to deal with naive times and local times"""
if settings.USE_TZ:
return timezone.localtime()
return datetime.now()
16 changes: 9 additions & 7 deletions django_q/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,13 @@ class Conf:
RETRY = conf.get("retry", 60)

# Verify if retry and timeout settings are correct
if not TIMEOUT or (TIMEOUT > RETRY):
warn("""Retry and timeout are misconfigured. Set retry larger than timeout,
if not TIMEOUT or (TIMEOUT > RETRY):
warn(
"""Retry and timeout are misconfigured. Set retry larger than timeout,
failure to do so will cause the tasks to be retriggered before completion.
See https://django-q.readthedocs.io/en/latest/configure.html#retry for details.""")

See https://django-q.readthedocs.io/en/latest/configure.html#retry for details."""
)

# Sets the amount of tasks the cluster will try to pop off the broker.
# If it supports bulk gets.
BULK = conf.get("bulk", 1)
Expand Down Expand Up @@ -176,7 +177,7 @@ class Conf:
ERROR_REPORTER = conf.get("error_reporter", {})

# Optional attempt count. set to 0 for infinite attempts
MAX_ATTEMPTS = conf.get('max_attempts', 0)
MAX_ATTEMPTS = conf.get("max_attempts", 0)

# OSX doesn't implement qsize because of missing sem_getvalue()
try:
Expand All @@ -200,7 +201,8 @@ class Conf:

# to manage workarounds during testing
TESTING = conf.get("testing", False)



# logger
logger = logging.getLogger("django-q")

Expand Down
13 changes: 4 additions & 9 deletions django_q/core_signing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@
import time
import zlib

from django.core.signing import (
BadSignature,
SignatureExpired,
b64_decode,
JSONSerializer,
Signer as Sgnr,
TimestampSigner as TsS,
dumps,
)
from django.core.signing import BadSignature, JSONSerializer, SignatureExpired
from django.core.signing import Signer as Sgnr
from django.core.signing import TimestampSigner as TsS
from django.core.signing import b64_decode, dumps
from django.utils import baseconv
from django.utils.crypto import constant_time_compare
from django.utils.encoding import force_bytes, force_str
Expand Down
Loading