-
Notifications
You must be signed in to change notification settings - Fork 429
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(deps): Migrate MFA code to our codebase and bump djangorestframew…
…ork (#3988)
- Loading branch information
1 parent
6e77054
commit e217df7
Showing
39 changed files
with
1,163 additions
and
489 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,28 @@ | ||
from trench.backends.application import ApplicationBackend | ||
from typing import Any, Dict | ||
|
||
from django.conf import settings | ||
from pyotp import TOTP | ||
from rest_framework.response import Response | ||
|
||
from custom_auth.mfa.trench.models import MFAMethod | ||
|
||
|
||
class CustomApplicationBackend: | ||
def __init__(self, mfa_method: MFAMethod, config: Dict[str, Any]) -> None: | ||
self._mfa_method = mfa_method | ||
self._config = config | ||
self._totp = TOTP(self._mfa_method.secret) | ||
|
||
class CustomApplicationBackend(ApplicationBackend): | ||
def dispatch_message(self): | ||
original_message = super(CustomApplicationBackend, self).dispatch_message() | ||
data = {**original_message, "secret": self.obj.secret} | ||
return data | ||
qr_link = self._totp.provisioning_uri( | ||
self._mfa_method.user.email, settings.TRENCH_AUTH["APPLICATION_ISSUER_NAME"] | ||
) | ||
data = { | ||
"qr_link": qr_link, | ||
"secret": self._mfa_method.secret, | ||
} | ||
return Response(data) | ||
|
||
def validate_code(self, code: str) -> bool: | ||
validity_period = settings.TRENCH_AUTH["MFA_METHODS"]["app"]["VALIDITY_PERIOD"] | ||
return self._totp.verify(otp=code, valid_window=int(validity_period / 20)) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from django.contrib import admin | ||
|
||
from custom_auth.mfa.trench.models import MFAMethod | ||
|
||
|
||
@admin.register(MFAMethod) | ||
class MFAMethodAdmin(admin.ModelAdmin): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from django.apps import AppConfig | ||
|
||
|
||
class TrenchConfig(AppConfig): | ||
name = "custom_auth.mfa.trench" | ||
verbose_name = "django-trench" |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
from typing import Callable, Set, Type | ||
|
||
from custom_auth.mfa.trench.command.generate_backup_codes import ( | ||
generate_backup_codes_command, | ||
) | ||
from custom_auth.mfa.trench.command.replace_mfa_method_backup_codes import ( | ||
regenerate_backup_codes_for_mfa_method_command, | ||
) | ||
from custom_auth.mfa.trench.models import MFAMethod | ||
from custom_auth.mfa.trench.utils import get_mfa_model | ||
|
||
|
||
class ActivateMFAMethodCommand: | ||
def __init__( | ||
self, mfa_model: Type[MFAMethod], backup_codes_generator: Callable | ||
) -> None: | ||
self._mfa_model = mfa_model | ||
self._backup_codes_generator = backup_codes_generator | ||
|
||
def execute(self, user_id: int, name: str, code: str) -> Set[str]: | ||
self._mfa_model.objects.filter(user_id=user_id, name=name).update( | ||
is_active=True, | ||
is_primary=not self._mfa_model.objects.primary_exists(user_id=user_id), | ||
) | ||
|
||
backup_codes = regenerate_backup_codes_for_mfa_method_command( | ||
user_id=user_id, | ||
name=name, | ||
) | ||
|
||
return backup_codes | ||
|
||
|
||
activate_mfa_method_command = ActivateMFAMethodCommand( | ||
mfa_model=get_mfa_model(), | ||
backup_codes_generator=generate_backup_codes_command, | ||
).execute |
36 changes: 36 additions & 0 deletions
36
api/custom_auth/mfa/trench/command/authenticate_second_factor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from custom_auth.mfa.trench.command.remove_backup_code import ( | ||
remove_backup_code_command, | ||
) | ||
from custom_auth.mfa.trench.command.validate_backup_code import ( | ||
validate_backup_code_command, | ||
) | ||
from custom_auth.mfa.trench.exceptions import ( | ||
InvalidCodeError, | ||
InvalidTokenError, | ||
) | ||
from custom_auth.mfa.trench.models import MFAMethod | ||
from custom_auth.mfa.trench.utils import get_mfa_handler, user_token_generator | ||
from users.models import FFAdminUser | ||
|
||
|
||
def is_authenticated(user_id: int, code: str) -> None: | ||
for auth_method in MFAMethod.objects.list_active(user_id=user_id): | ||
validated_backup_code = validate_backup_code_command( | ||
value=code, backup_codes=auth_method.backup_codes | ||
) | ||
if get_mfa_handler(mfa_method=auth_method).validate_code(code=code): | ||
return | ||
if validated_backup_code: | ||
remove_backup_code_command( | ||
user_id=auth_method.user_id, method_name=auth_method.name, code=code | ||
) | ||
return | ||
raise InvalidCodeError() | ||
|
||
|
||
def authenticate_second_step_command(code: str, ephemeral_token: str) -> FFAdminUser: | ||
user = user_token_generator.check_token(user=None, token=ephemeral_token) | ||
if user is None: | ||
raise InvalidTokenError() | ||
is_authenticated(user_id=user.id, code=code) | ||
return user |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
from typing import Callable, Type | ||
|
||
from custom_auth.mfa.trench.command.create_secret import create_secret_command | ||
from custom_auth.mfa.trench.exceptions import MFAMethodAlreadyActiveError | ||
from custom_auth.mfa.trench.models import MFAMethod | ||
from custom_auth.mfa.trench.utils import get_mfa_model | ||
|
||
|
||
class CreateMFAMethodCommand: | ||
def __init__(self, secret_generator: Callable, mfa_model: Type[MFAMethod]) -> None: | ||
self._mfa_model = mfa_model | ||
self._create_secret = secret_generator | ||
|
||
def execute(self, user_id: int, name: str) -> MFAMethod: | ||
mfa, created = self._mfa_model.objects.get_or_create( | ||
user_id=user_id, | ||
name=name, | ||
defaults={ | ||
"secret": self._create_secret, | ||
"is_active": False, | ||
}, | ||
) | ||
if not created and mfa.is_active: | ||
raise MFAMethodAlreadyActiveError() | ||
return mfa | ||
|
||
|
||
create_mfa_method_command = CreateMFAMethodCommand( | ||
secret_generator=create_secret_command, mfa_model=get_mfa_model() | ||
).execute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from django.conf import settings | ||
from pyotp import random_base32 | ||
|
||
|
||
def create_secret_command() -> str: | ||
generator = random_base32 | ||
return generator(length=settings.TRENCH_AUTH["SECRET_KEY_LENGTH"]) |
27 changes: 27 additions & 0 deletions
27
api/custom_auth/mfa/trench/command/deactivate_mfa_method.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from typing import Type | ||
|
||
from django.db.transaction import atomic | ||
|
||
from custom_auth.mfa.trench.exceptions import MFANotEnabledError | ||
from custom_auth.mfa.trench.models import MFAMethod | ||
from custom_auth.mfa.trench.utils import get_mfa_model | ||
|
||
|
||
class DeactivateMFAMethodCommand: | ||
def __init__(self, mfa_model: Type[MFAMethod]) -> None: | ||
self._mfa_model = mfa_model | ||
|
||
@atomic | ||
def execute(self, mfa_method_name: str, user_id: int) -> None: | ||
mfa = self._mfa_model.objects.get_by_name(user_id=user_id, name=mfa_method_name) | ||
if not mfa.is_active: | ||
raise MFANotEnabledError() | ||
|
||
self._mfa_model.objects.filter(user_id=user_id, name=mfa_method_name).update( | ||
is_active=False, is_primary=False | ||
) | ||
|
||
|
||
deactivate_mfa_method_command = DeactivateMFAMethodCommand( | ||
mfa_model=get_mfa_model() | ||
).execute |
38 changes: 38 additions & 0 deletions
38
api/custom_auth/mfa/trench/command/generate_backup_codes.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
from typing import Callable, Set | ||
|
||
from django.conf import settings | ||
from django.utils.crypto import get_random_string | ||
|
||
|
||
class GenerateBackupCodesCommand: | ||
def __init__(self, random_string_generator: Callable) -> None: | ||
self._random_string_generator = random_string_generator | ||
|
||
def execute( | ||
self, | ||
quantity: int = settings.TRENCH_AUTH["BACKUP_CODES_QUANTITY"], | ||
length: int = settings.TRENCH_AUTH["BACKUP_CODES_LENGTH"], | ||
allowed_chars: str = settings.TRENCH_AUTH["BACKUP_CODES_CHARACTERS"], | ||
) -> Set[str]: | ||
""" | ||
Generates random encrypted backup codes. | ||
:param quantity: How many codes should be generated | ||
:type quantity: int | ||
:param length: How long codes should be | ||
:type length: int | ||
:param allowed_chars: Characters to create backup codes from | ||
:type allowed_chars: str | ||
:returns: Encrypted backup codes | ||
:rtype: set[str] | ||
""" | ||
return { | ||
self._random_string_generator(length, allowed_chars) | ||
for _ in range(quantity) | ||
} | ||
|
||
|
||
generate_backup_codes_command = GenerateBackupCodesCommand( | ||
random_string_generator=get_random_string, | ||
).execute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
from typing import Any, Optional, Set | ||
|
||
from django.contrib.auth.hashers import check_password | ||
|
||
from custom_auth.mfa.trench.models import MFAMethod | ||
|
||
|
||
def remove_backup_code_command(user_id: Any, method_name: str, code: str) -> None: | ||
serialized_codes = ( | ||
MFAMethod.objects.filter(user_id=user_id, name=method_name) | ||
.values_list("_backup_codes", flat=True) | ||
.first() | ||
) | ||
codes = MFAMethod._BACKUP_CODES_DELIMITER.join( | ||
_remove_code_from_set( | ||
backup_codes=set(serialized_codes.split(MFAMethod._BACKUP_CODES_DELIMITER)), | ||
code=code, | ||
) | ||
) | ||
MFAMethod.objects.filter(user_id=user_id, name=method_name).update( | ||
_backup_codes=codes | ||
) | ||
|
||
|
||
def _remove_code_from_set(backup_codes: Set[str], code: str) -> Optional[Set[str]]: | ||
for backup_code in backup_codes: | ||
if check_password(code, backup_code): | ||
backup_codes.remove(backup_code) | ||
return backup_codes |
39 changes: 39 additions & 0 deletions
39
api/custom_auth/mfa/trench/command/replace_mfa_method_backup_codes.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from typing import Callable, Set, Type | ||
|
||
from django.contrib.auth.hashers import make_password | ||
|
||
from custom_auth.mfa.trench.command.generate_backup_codes import ( | ||
generate_backup_codes_command, | ||
) | ||
from custom_auth.mfa.trench.models import MFAMethod | ||
from custom_auth.mfa.trench.utils import get_mfa_model | ||
|
||
|
||
class RegenerateBackupCodesForMFAMethodCommand: | ||
def __init__( | ||
self, | ||
mfa_model: Type[MFAMethod], | ||
code_hasher: Callable, | ||
codes_generator: Callable, | ||
) -> None: | ||
self._mfa_model = mfa_model | ||
self._code_hasher = code_hasher | ||
self._codes_generator = codes_generator | ||
|
||
def execute(self, user_id: int, name: str) -> Set[str]: | ||
backup_codes = self._codes_generator() | ||
self._mfa_model.objects.filter(user_id=user_id, name=name).update( | ||
_backup_codes=MFAMethod._BACKUP_CODES_DELIMITER.join( | ||
[self._code_hasher(backup_code) for backup_code in backup_codes] | ||
), | ||
) | ||
return backup_codes | ||
|
||
|
||
regenerate_backup_codes_for_mfa_method_command = ( | ||
RegenerateBackupCodesForMFAMethodCommand( | ||
mfa_model=get_mfa_model(), | ||
code_hasher=make_password, | ||
codes_generator=generate_backup_codes_command, | ||
).execute | ||
) |
10 changes: 10 additions & 0 deletions
10
api/custom_auth/mfa/trench/command/validate_backup_code.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from typing import Iterable, Optional | ||
|
||
from django.contrib.auth.hashers import check_password | ||
|
||
|
||
def validate_backup_code_command(value: str, backup_codes: Iterable) -> Optional[str]: | ||
for backup_code in backup_codes: | ||
if check_password(value, backup_code): | ||
return backup_code | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from django.utils.translation import gettext_lazy as _ | ||
from rest_framework.serializers import ValidationError | ||
|
||
|
||
class MFAValidationError(ValidationError): | ||
def __str__(self) -> str: | ||
return ", ".join(detail for detail in self.detail) | ||
|
||
|
||
class CodeInvalidOrExpiredError(MFAValidationError): | ||
def __init__(self) -> None: | ||
super().__init__( | ||
detail=_("Code invalid or expired."), | ||
code="code_invalid_or_expired", | ||
) | ||
|
||
|
||
class MFAMethodDoesNotExistError(MFAValidationError): | ||
def __init__(self) -> None: | ||
super().__init__( | ||
detail=_("Requested MFA method does not exist."), | ||
code="mfa_method_does_not_exist", | ||
) | ||
|
||
|
||
class MFAMethodAlreadyActiveError(MFAValidationError): | ||
def __init__(self) -> None: | ||
super().__init__( | ||
detail=_("MFA method already active."), | ||
code="method_already_active", | ||
) | ||
|
||
|
||
class MFANotEnabledError(MFAValidationError): | ||
def __init__(self) -> None: | ||
super().__init__(detail=_("2FA is not enabled."), code="not_enabled") | ||
|
||
|
||
class InvalidTokenError(MFAValidationError): | ||
def __init__(self) -> None: | ||
super().__init__(detail=_("Invalid or expired token."), code="invalid_token") | ||
|
||
|
||
class InvalidCodeError(MFAValidationError): | ||
def __init__(self) -> None: | ||
super().__init__(detail=_("Invalid or expired code."), code="invalid_code") |
Oops, something went wrong.