-
Notifications
You must be signed in to change notification settings - Fork 429
/
Copy pathserializers.py
156 lines (125 loc) · 5.16 KB
/
serializers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from django.conf import settings
from djoser.conf import settings as djoser_settings
from djoser.serializers import TokenCreateSerializer, UserCreateSerializer
from rest_framework import serializers
from rest_framework.authtoken.models import Token
from rest_framework.exceptions import PermissionDenied
from rest_framework.validators import UniqueValidator
from organisations.invites.models import Invite, InviteLink
from users.auth_type import AuthType
from users.constants import DEFAULT_DELETE_ORPHAN_ORGANISATIONS_VALUE
from users.models import FFAdminUser, SignUpType
from .constants import (
FIELD_BLANK_ERROR,
INVALID_PASSWORD_ERROR,
USER_REGISTRATION_WITHOUT_INVITE_ERROR_MESSAGE,
)
class CustomTokenCreateSerializer(TokenCreateSerializer):
"""
NOTE: Some authentication backends (e.g., LDAP) support only
username and password authentication. However, the front-end
currently sends the email as the login key. To accommodate
this, we override the serializer to rename the username field
to the email (or any other field configurable using djoser settings) field.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if djoser_settings.LOGIN_FIELD != FFAdminUser.USERNAME_FIELD:
# Because djoser have created a field named username(djoser_settings.LOGIN_FIELD) in the serializer
# We have to remove this and add the email(FFAdminUser.USERNAME_FIELD) field back
self.fields.pop(djoser_settings.LOGIN_FIELD)
self.fields[FFAdminUser.USERNAME_FIELD] = serializers.CharField(
required=False
)
def validate(self, attrs):
if djoser_settings.LOGIN_FIELD != FFAdminUser.USERNAME_FIELD:
attrs[djoser_settings.LOGIN_FIELD] = attrs.pop(FFAdminUser.USERNAME_FIELD)
return super().validate(attrs)
class CustomTokenSerializer(serializers.ModelSerializer):
class Meta:
model = Token
fields = ("key",)
class InviteLinkValidationMixin:
invite_hash = serializers.CharField(required=False, write_only=True)
def _validate_registration_invite(self, email: str, sign_up_type: str) -> None:
if settings.ALLOW_REGISTRATION_WITHOUT_INVITE:
return
valid = False
match sign_up_type:
case SignUpType.INVITE_LINK.value:
valid = InviteLink.objects.filter(
hash=self.initial_data.get("invite_hash")
).exists()
case SignUpType.INVITE_EMAIL.value:
valid = Invite.objects.filter(email__iexact=email.lower()).exists()
if not valid:
raise PermissionDenied(USER_REGISTRATION_WITHOUT_INVITE_ERROR_MESSAGE)
class CustomUserCreateSerializer(UserCreateSerializer, InviteLinkValidationMixin):
key = serializers.SerializerMethodField()
class Meta(UserCreateSerializer.Meta):
fields = UserCreateSerializer.Meta.fields + (
"is_active",
"marketing_consent_given",
"key",
"uuid",
)
read_only_fields = ("is_active", "uuid")
write_only_fields = ("sign_up_type",)
extra_kwargs = {
"email": {
"validators": [
UniqueValidator(
queryset=FFAdminUser.objects.all(),
lookup="iexact",
message="Email already exists. Please log in.",
)
]
}
}
def validate(self, attrs):
attrs = super().validate(attrs)
email = attrs.get("email")
if settings.AUTH_CONTROLLER_INSTALLED:
from auth_controller.controller import (
is_authentication_method_valid,
)
is_authentication_method_valid(
self.context.get("request"), email=email, raise_exception=True
)
self._validate_registration_invite(
email=email, sign_up_type=attrs.get("sign_up_type")
)
attrs["email"] = email.lower()
return attrs
@staticmethod
def get_key(instance):
token, _ = Token.objects.get_or_create(user=instance)
return token.key
class CustomUserDelete(serializers.Serializer):
current_password = serializers.CharField(
style={"input_type": "password"},
required=False,
allow_null=True,
allow_blank=True,
)
default_error_messages = {
"invalid_password": INVALID_PASSWORD_ERROR,
"field_blank": FIELD_BLANK_ERROR,
}
def validate_current_password(self, value):
user_auth_type = self.context["request"].user.auth_type
if (
user_auth_type == AuthType.GOOGLE.value
or user_auth_type == AuthType.GITHUB.value
):
return value
if not value:
return self.fail("field_blank")
is_password_valid = self.context["request"].user.check_password(value)
if is_password_valid:
return value
else:
self.fail("invalid_password")
delete_orphan_organisations = serializers.BooleanField(
default=DEFAULT_DELETE_ORPHAN_ORGANISATIONS_VALUE, required=False
)