Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(deps): Migrate MFA code to our codebase and bump djangorestframework #3988

Merged
merged 11 commits into from
Jun 7, 2024
4 changes: 2 additions & 2 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
"api_keys",
"features.feature_external_resources",
# 2FA
"trench",
"custom_auth.mfa.trench",
# health check plugins
"health_check",
"health_check.db",
Expand Down Expand Up @@ -750,7 +750,6 @@
}

TRENCH_AUTH = {
"FROM_EMAIL": DEFAULT_FROM_EMAIL,
"BACKUP_CODES_QUANTITY": 5,
"BACKUP_CODES_LENGTH": 10, # keep (quantity * length) under 200
"BACKUP_CODES_CHARACTERS": (
Expand All @@ -759,6 +758,7 @@
"DEFAULT_VALIDITY_PERIOD": 30,
"CONFIRM_BACKUP_CODES_REGENERATION_WITH_CODE": True,
"APPLICATION_ISSUER_NAME": "app.bullet-train.io",
"ENCRYPT_BACKUP_CODES": True,
"MFA_METHODS": {
"app": {
"VERBOSE_NAME": "TOTP App",
Expand Down
30 changes: 25 additions & 5 deletions api/custom_auth/mfa/backends/application.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
from trench.backends.application import ApplicationBackend
from typing import Any, Dict

from django.conf import settings
from pyotp import TOTP
from rest_framework.response import Response

from custom_auth.mfa.trench.models import MFAMethod


class CustomApplicationBackend:
def __init__(self, mfa_method: MFAMethod, config: Dict[str, Any]) -> None:
self._mfa_method = mfa_method
self._config = config
self._totp = TOTP(self._mfa_method.secret)

class CustomApplicationBackend(ApplicationBackend):
def dispatch_message(self):
original_message = super(CustomApplicationBackend, self).dispatch_message()
data = {**original_message, "secret": self.obj.secret}
return data
qr_link = self._totp.provisioning_uri(
self._mfa_method.user.email, settings.TRENCH_AUTH["APPLICATION_ISSUER_NAME"]
)
data = {
"qr_link": qr_link,
"secret": self._mfa_method.secret,
}
return Response(data)

def validate_code(self, code: str) -> bool:
validity_period = settings.TRENCH_AUTH["MFA_METHODS"]["app"]["VALIDITY_PERIOD"]
return self._totp.verify(otp=code, valid_window=int(validity_period / 20))
Empty file.
8 changes: 8 additions & 0 deletions api/custom_auth/mfa/trench/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.contrib import admin

from custom_auth.mfa.trench.models import MFAMethod


@admin.register(MFAMethod)
class MFAMethodAdmin(admin.ModelAdmin):
pass
6 changes: 6 additions & 0 deletions api/custom_auth/mfa/trench/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class TrenchConfig(AppConfig):
name = "custom_auth.mfa.trench"
verbose_name = "django-trench"
Empty file.
Empty file.
37 changes: 37 additions & 0 deletions api/custom_auth/mfa/trench/command/activate_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Callable, Set, Type

from custom_auth.mfa.trench.command.generate_backup_codes import (
generate_backup_codes_command,
)
from custom_auth.mfa.trench.command.replace_mfa_method_backup_codes import (
regenerate_backup_codes_for_mfa_method_command,
)
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_model


class ActivateMFAMethodCommand:
def __init__(
self, mfa_model: Type[MFAMethod], backup_codes_generator: Callable
) -> None:
self._mfa_model = mfa_model
self._backup_codes_generator = backup_codes_generator

def execute(self, user_id: int, name: str, code: str) -> Set[str]:
self._mfa_model.objects.filter(user_id=user_id, name=name).update(
is_active=True,
is_primary=not self._mfa_model.objects.primary_exists(user_id=user_id),
)

backup_codes = regenerate_backup_codes_for_mfa_method_command(
user_id=user_id,
name=name,
)

return backup_codes


activate_mfa_method_command = ActivateMFAMethodCommand(
mfa_model=get_mfa_model(),
backup_codes_generator=generate_backup_codes_command,
).execute
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from custom_auth.mfa.trench.command.remove_backup_code import (
remove_backup_code_command,
)
from custom_auth.mfa.trench.command.validate_backup_code import (
validate_backup_code_command,
)
from custom_auth.mfa.trench.exceptions import (
InvalidCodeError,
InvalidTokenError,
)
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_handler, user_token_generator
from users.models import FFAdminUser


def is_authenticated(user_id: int, code: str) -> None:
for auth_method in MFAMethod.objects.list_active(user_id=user_id):
validated_backup_code = validate_backup_code_command(
value=code, backup_codes=auth_method.backup_codes
)
if get_mfa_handler(mfa_method=auth_method).validate_code(code=code):
return
if validated_backup_code:
remove_backup_code_command(
user_id=auth_method.user_id, method_name=auth_method.name, code=code
)
return
raise InvalidCodeError()


def authenticate_second_step_command(code: str, ephemeral_token: str) -> FFAdminUser:
user = user_token_generator.check_token(user=None, token=ephemeral_token)
if user is None:
raise InvalidTokenError()
is_authenticated(user_id=user.id, code=code)
return user
30 changes: 30 additions & 0 deletions api/custom_auth/mfa/trench/command/create_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Callable, Type

from custom_auth.mfa.trench.command.create_secret import create_secret_command
from custom_auth.mfa.trench.exceptions import MFAMethodAlreadyActiveError
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_model


class CreateMFAMethodCommand:
def __init__(self, secret_generator: Callable, mfa_model: Type[MFAMethod]) -> None:
self._mfa_model = mfa_model
self._create_secret = secret_generator

def execute(self, user_id: int, name: str) -> MFAMethod:
mfa, created = self._mfa_model.objects.get_or_create(
user_id=user_id,
name=name,
defaults={
"secret": self._create_secret,
"is_active": False,
},
)
if not created and mfa.is_active:
raise MFAMethodAlreadyActiveError()
return mfa


create_mfa_method_command = CreateMFAMethodCommand(
secret_generator=create_secret_command, mfa_model=get_mfa_model()
).execute
7 changes: 7 additions & 0 deletions api/custom_auth/mfa/trench/command/create_secret.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from django.conf import settings
from pyotp import random_base32


def create_secret_command() -> str:
generator = random_base32
return generator(length=settings.TRENCH_AUTH["SECRET_KEY_LENGTH"])
27 changes: 27 additions & 0 deletions api/custom_auth/mfa/trench/command/deactivate_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Type

from django.db.transaction import atomic

from custom_auth.mfa.trench.exceptions import MFANotEnabledError
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_model


class DeactivateMFAMethodCommand:
def __init__(self, mfa_model: Type[MFAMethod]) -> None:
self._mfa_model = mfa_model

@atomic
def execute(self, mfa_method_name: str, user_id: int) -> None:
mfa = self._mfa_model.objects.get_by_name(user_id=user_id, name=mfa_method_name)
if not mfa.is_active:
raise MFANotEnabledError()

self._mfa_model.objects.filter(user_id=user_id, name=mfa_method_name).update(
is_active=False, is_primary=False
)


deactivate_mfa_method_command = DeactivateMFAMethodCommand(
mfa_model=get_mfa_model()
).execute
38 changes: 38 additions & 0 deletions api/custom_auth/mfa/trench/command/generate_backup_codes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Callable, Set

from django.conf import settings
from django.utils.crypto import get_random_string


class GenerateBackupCodesCommand:
def __init__(self, random_string_generator: Callable) -> None:
self._random_string_generator = random_string_generator

def execute(
self,
quantity: int = settings.TRENCH_AUTH["BACKUP_CODES_QUANTITY"],
length: int = settings.TRENCH_AUTH["BACKUP_CODES_LENGTH"],
allowed_chars: str = settings.TRENCH_AUTH["BACKUP_CODES_CHARACTERS"],
) -> Set[str]:
"""
Generates random encrypted backup codes.

:param quantity: How many codes should be generated
:type quantity: int
:param length: How long codes should be
:type length: int
:param allowed_chars: Characters to create backup codes from
:type allowed_chars: str

:returns: Encrypted backup codes
:rtype: set[str]
"""
return {
self._random_string_generator(length, allowed_chars)
for _ in range(quantity)
}


generate_backup_codes_command = GenerateBackupCodesCommand(
random_string_generator=get_random_string,
).execute
29 changes: 29 additions & 0 deletions api/custom_auth/mfa/trench/command/remove_backup_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Any, Optional, Set

from django.contrib.auth.hashers import check_password

from custom_auth.mfa.trench.models import MFAMethod


def remove_backup_code_command(user_id: Any, method_name: str, code: str) -> None:
serialized_codes = (
MFAMethod.objects.filter(user_id=user_id, name=method_name)
.values_list("_backup_codes", flat=True)
.first()
)
codes = MFAMethod._BACKUP_CODES_DELIMITER.join(
_remove_code_from_set(
backup_codes=set(serialized_codes.split(MFAMethod._BACKUP_CODES_DELIMITER)),
code=code,
)
)
MFAMethod.objects.filter(user_id=user_id, name=method_name).update(
_backup_codes=codes
)


def _remove_code_from_set(backup_codes: Set[str], code: str) -> Optional[Set[str]]:
for backup_code in backup_codes:
if check_password(code, backup_code):
backup_codes.remove(backup_code)
return backup_codes
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import Callable, Set, Type

from django.contrib.auth.hashers import make_password

from custom_auth.mfa.trench.command.generate_backup_codes import (
generate_backup_codes_command,
)
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_model


class RegenerateBackupCodesForMFAMethodCommand:
def __init__(
self,
mfa_model: Type[MFAMethod],
code_hasher: Callable,
codes_generator: Callable,
) -> None:
self._mfa_model = mfa_model
self._code_hasher = code_hasher
self._codes_generator = codes_generator

def execute(self, user_id: int, name: str) -> Set[str]:
backup_codes = self._codes_generator()
self._mfa_model.objects.filter(user_id=user_id, name=name).update(
_backup_codes=MFAMethod._BACKUP_CODES_DELIMITER.join(
[self._code_hasher(backup_code) for backup_code in backup_codes]
),
)
return backup_codes


regenerate_backup_codes_for_mfa_method_command = (
RegenerateBackupCodesForMFAMethodCommand(
mfa_model=get_mfa_model(),
code_hasher=make_password,
codes_generator=generate_backup_codes_command,
).execute
)
10 changes: 10 additions & 0 deletions api/custom_auth/mfa/trench/command/validate_backup_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Iterable, Optional

from django.contrib.auth.hashers import check_password


def validate_backup_code_command(value: str, backup_codes: Iterable) -> Optional[str]:
for backup_code in backup_codes:
if check_password(value, backup_code):
return backup_code
return None
46 changes: 46 additions & 0 deletions api/custom_auth/mfa/trench/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from django.utils.translation import gettext_lazy as _
from rest_framework.serializers import ValidationError


class MFAValidationError(ValidationError):
def __str__(self) -> str:
return ", ".join(detail for detail in self.detail)


class CodeInvalidOrExpiredError(MFAValidationError):
def __init__(self) -> None:
super().__init__(
detail=_("Code invalid or expired."),
code="code_invalid_or_expired",
)


class MFAMethodDoesNotExistError(MFAValidationError):
def __init__(self) -> None:
super().__init__(
detail=_("Requested MFA method does not exist."),
code="mfa_method_does_not_exist",
)


class MFAMethodAlreadyActiveError(MFAValidationError):
def __init__(self) -> None:
super().__init__(
detail=_("MFA method already active."),
code="method_already_active",
)


class MFANotEnabledError(MFAValidationError):
def __init__(self) -> None:
super().__init__(detail=_("2FA is not enabled."), code="not_enabled")


class InvalidTokenError(MFAValidationError):
def __init__(self) -> None:
super().__init__(detail=_("Invalid or expired token."), code="invalid_token")


class InvalidCodeError(MFAValidationError):
def __init__(self) -> None:
super().__init__(detail=_("Invalid or expired code."), code="invalid_code")
Loading
Loading