-
Notifications
You must be signed in to change notification settings - Fork 429
/
Copy pathreceivers.py
64 lines (49 loc) · 2.55 KB
/
receivers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from django.db.models.signals import post_init, post_save, pre_save
from django.dispatch import receiver
from django.utils import timezone
from environments.tasks import rebuild_environment_document
from features.versioning.models import EnvironmentFeatureVersion
from features.versioning.signals import environment_feature_version_published
from features.versioning.tasks import (
create_environment_feature_version_published_audit_log_task,
trigger_update_version_webhooks,
)
@receiver(post_save, sender=EnvironmentFeatureVersion)
def add_existing_feature_states( # type: ignore[no-untyped-def]
instance: EnvironmentFeatureVersion, created: bool, **kwargs
):
if not created:
return
previous_environment_feature_version = instance.get_previous_version()
if not previous_environment_feature_version:
return
for feature_state in previous_environment_feature_version.feature_states.all():
feature_state.clone(
env=instance.environment, environment_feature_version=instance
)
@receiver(pre_save, sender=EnvironmentFeatureVersion)
def update_live_from(instance: EnvironmentFeatureVersion, **kwargs): # type: ignore[no-untyped-def]
if instance.published and not instance.live_from:
instance.live_from = timezone.now()
@receiver(post_init, sender=EnvironmentFeatureVersion)
def cache_fields(instance: EnvironmentFeatureVersion, **kwargs): # type: ignore[no-untyped-def]
instance._init_fields = {"published": instance.published}
@receiver(environment_feature_version_published, sender=EnvironmentFeatureVersion)
def update_environment_document(instance: EnvironmentFeatureVersion, **kwargs): # type: ignore[no-untyped-def]
rebuild_environment_document.delay(
kwargs={"environment_id": instance.environment_id},
delay_until=instance.live_from,
)
@receiver(environment_feature_version_published, sender=EnvironmentFeatureVersion)
def trigger_webhooks(instance: EnvironmentFeatureVersion, **kwargs) -> None: # type: ignore[no-untyped-def]
trigger_update_version_webhooks.delay(
kwargs={"environment_feature_version_uuid": str(instance.uuid)},
delay_until=instance.live_from,
)
@receiver(environment_feature_version_published, sender=EnvironmentFeatureVersion)
def create_environment_feature_version_published_audit_log( # type: ignore[no-untyped-def]
instance: EnvironmentFeatureVersion, **kwargs
) -> None:
create_environment_feature_version_published_audit_log_task.delay(
kwargs={"environment_feature_version_uuid": str(instance.uuid)}
)