Skip to content

Commit

Permalink
Add trench
Browse files Browse the repository at this point in the history
  • Loading branch information
gagantrivedi committed May 22, 2024
1 parent 5e47fb2 commit 7e46a2c
Show file tree
Hide file tree
Showing 42 changed files with 1,674 additions and 11 deletions.
3 changes: 2 additions & 1 deletion api/custom_auth/mfa/backends/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from django.conf import settings
from pyotp import TOTP
from rest_framework.response import Response
from trench.models import MFAMethod

from custom_auth.mfa.trench.models import MFAMethod


class CustomApplicationBackend:
Expand Down
1 change: 1 addition & 0 deletions api/custom_auth/mfa/trench/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.3.1"
7 changes: 7 additions & 0 deletions api/custom_auth/mfa/trench/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from django.contrib import admin
from trench.models import MFAMethod


@admin.register(MFAMethod)
class MFAMethodAdmin(admin.ModelAdmin):
pass
6 changes: 6 additions & 0 deletions api/custom_auth/mfa/trench/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class TrenchConfig(AppConfig):
name = "trench"
verbose_name = "django-trench"
Empty file.
29 changes: 29 additions & 0 deletions api/custom_auth/mfa/trench/backends/application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging

from django.contrib.auth import get_user_model
from django.contrib.auth.models import AbstractUser
from trench.backends.base import AbstractMessageDispatcher
from trench.responses import (
DispatchResponse,
FailedDispatchResponse,
SuccessfulDispatchResponse,
)
from trench.settings import trench_settings

User: AbstractUser = get_user_model()


class ApplicationMessageDispatcher(AbstractMessageDispatcher):
def dispatch_message(self) -> DispatchResponse:
try:
qr_link = self._create_qr_link(self._mfa_method.user)
return SuccessfulDispatchResponse(details=qr_link)
except Exception as cause: # pragma: nocover
logging.error(cause, exc_info=True) # pragma: nocover
return FailedDispatchResponse(details=str(cause)) # pragma: nocover

def _create_qr_link(self, user: User) -> str:
return self._get_otp().provisioning_uri(
getattr(user, User.USERNAME_FIELD),
trench_settings.APPLICATION_ISSUER_NAME,
)
85 changes: 85 additions & 0 deletions api/custom_auth/mfa/trench/backends/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Tuple

from django.db.models import Model
from pyotp import TOTP
from trench.command.create_otp import create_otp_command
from trench.exceptions import MissingConfigurationError
from trench.models import MFAMethod
from trench.responses import DispatchResponse
from trench.settings import SOURCE_FIELD, VALIDITY_PERIOD, trench_settings


class AbstractMessageDispatcher(ABC):
def __init__(self, mfa_method: MFAMethod, config: Dict[str, Any]) -> None:
self._mfa_method = mfa_method
self._config = config
self._to = self._get_source_field()

def _get_source_field(self) -> Optional[str]:
if SOURCE_FIELD in self._config:
source = self._get_nested_attr_value(
self._mfa_method.user, self._config[SOURCE_FIELD]
)
if source is None:
raise MissingConfigurationError(
attribute_name=self._config[SOURCE_FIELD]
)
return source
return None

def _get_nested_attr_value(self, obj: Model, path: str) -> Optional[str]:
objects, attr = self._parse_dotted_path(path)
try:
_obj = self._get_innermost_object(obj, objects)
except AttributeError: # pragma: no cover
return None # pragma: no cover
return getattr(_obj, attr)

@staticmethod
def _parse_dotted_path(path: str) -> Tuple[Optional[str], str]:
"""
Extracts attribute name from dotted path.
"""
try:
objects, attr = path.rsplit(".", 1)
return objects, attr
except ValueError:
return None, path

@staticmethod
def _get_innermost_object(obj: Model, dotted_path: Optional[str] = None) -> Model:
"""
For given object return innermost object.
"""
if dotted_path is None:
return obj
for o in dotted_path.split("."):
obj = getattr(obj, o)
return obj # pragma: no cover

@abstractmethod
def dispatch_message(self) -> DispatchResponse:
raise NotImplementedError # pragma: no cover

def create_code(self) -> str:
return self._get_otp().now()

def confirm_activation(self, code: str) -> None:
pass

def validate_confirmation_code(self, code: str) -> bool:
return self.validate_code(code)

def validate_code(self, code: str) -> bool:
return self._get_otp().verify(otp=code)

def _get_otp(self) -> TOTP:
return create_otp_command(
secret=self._mfa_method.secret, interval=self._get_valid_window()
)

def _get_valid_window(self) -> int:
return self._config.get(
VALIDITY_PERIOD, trench_settings.DEFAULT_VALIDITY_PERIOD
)
10 changes: 10 additions & 0 deletions api/custom_auth/mfa/trench/backends/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from trench.backends.base import AbstractMessageDispatcher
from trench.models import MFAMethod
from trench.query.get_mfa_config_by_name import get_mfa_config_by_name_query
from trench.settings import HANDLER


def get_mfa_handler(mfa_method: MFAMethod) -> AbstractMessageDispatcher:
conf = get_mfa_config_by_name_query(name=mfa_method.name)
dispatcher = conf[HANDLER]
return dispatcher(mfa_method=mfa_method, config=conf)
Empty file.
46 changes: 46 additions & 0 deletions api/custom_auth/mfa/trench/command/activate_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Callable, Set, Type

from trench.backends.provider import get_mfa_handler
from trench.command.generate_backup_codes import generate_backup_codes_command
from trench.command.replace_mfa_method_backup_codes import (
regenerate_backup_codes_for_mfa_method_command,
)
from trench.exceptions import MFAMethodDoesNotExistError
from trench.models import MFAMethod
from trench.utils import get_mfa_model


class ActivateMFAMethodCommand:
def __init__(
self, mfa_model: Type[MFAMethod], backup_codes_generator: Callable
) -> None:
self._mfa_model = mfa_model
self._backup_codes_generator = backup_codes_generator

def execute(self, user_id: int, name: str, code: str) -> Set[str]:
mfa = self._mfa_model.objects.get_by_name(user_id=user_id, name=name)

get_mfa_handler(mfa).confirm_activation(code)

rows_affected = self._mfa_model.objects.filter(
user_id=user_id, name=name
).update(
is_active=True,
is_primary=not self._mfa_model.objects.primary_exists(user_id=user_id),
)

if rows_affected < 1:
raise MFAMethodDoesNotExistError()

backup_codes = regenerate_backup_codes_for_mfa_method_command(
user_id=user_id,
name=name,
)

return backup_codes


activate_mfa_method_command = ActivateMFAMethodCommand(
mfa_model=get_mfa_model(),
backup_codes_generator=generate_backup_codes_command,
).execute
43 changes: 43 additions & 0 deletions api/custom_auth/mfa/trench/command/authenticate_second_factor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Type

from django.contrib.auth import get_user_model
from django.contrib.auth.models import AbstractUser
from trench.backends.provider import get_mfa_handler
from trench.command.remove_backup_code import remove_backup_code_command
from trench.command.validate_backup_code import validate_backup_code_command
from trench.exceptions import InvalidCodeError, InvalidTokenError
from trench.models import MFAMethod
from trench.utils import get_mfa_model, user_token_generator

User: AbstractUser = get_user_model()


class AuthenticateSecondFactorCommand:
def __init__(self, mfa_model: Type[MFAMethod]) -> None:
self._mfa_model = mfa_model

def execute(self, code: str, ephemeral_token: str) -> User:
user = user_token_generator.check_token(user=None, token=ephemeral_token)
if user is None:
raise InvalidTokenError()
self.is_authenticated(user_id=user.id, code=code)
return user

def is_authenticated(self, user_id: int, code: str) -> None:
for auth_method in self._mfa_model.objects.list_active(user_id=user_id):
validated_backup_code = validate_backup_code_command(
value=code, backup_codes=auth_method.backup_codes
)
if get_mfa_handler(mfa_method=auth_method).validate_code(code=code):
return
if validated_backup_code:
remove_backup_code_command(
user_id=auth_method.user_id, method_name=auth_method.name, code=code
)
return
raise InvalidCodeError()


authenticate_second_step_command = AuthenticateSecondFactorCommand(
mfa_model=get_mfa_model()
).execute
22 changes: 22 additions & 0 deletions api/custom_auth/mfa/trench/command/authenticate_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from django.contrib.auth import authenticate, get_user_model
from django.contrib.auth.models import AbstractUser
from rest_framework.request import Request
from trench.exceptions import UnauthenticatedError

User: AbstractUser = get_user_model()


class AuthenticateUserCommand:
@staticmethod
def execute(request: Request, username: str, password: str) -> User:
user = authenticate(
request=request,
username=username,
password=password,
)
if user is None:
raise UnauthenticatedError()
return user


authenticate_user_command = AuthenticateUserCommand.execute
30 changes: 30 additions & 0 deletions api/custom_auth/mfa/trench/command/create_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Callable, Type

from trench.command.create_secret import create_secret_command
from trench.exceptions import MFAMethodAlreadyActiveError
from trench.models import MFAMethod
from trench.utils import get_mfa_model


class CreateMFAMethodCommand:
def __init__(self, secret_generator: Callable, mfa_model: Type[MFAMethod]) -> None:
self._mfa_model = mfa_model
self._create_secret = secret_generator

def execute(self, user_id: int, name: str) -> MFAMethod:
mfa, created = self._mfa_model.objects.get_or_create(
user_id=user_id,
name=name,
defaults={
"secret": self._create_secret,
"is_active": False,
},
)
if not created and mfa.is_active:
raise MFAMethodAlreadyActiveError()
return mfa


create_mfa_method_command = CreateMFAMethodCommand(
secret_generator=create_secret_command, mfa_model=get_mfa_model()
).execute
10 changes: 10 additions & 0 deletions api/custom_auth/mfa/trench/command/create_otp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pyotp import TOTP


class CreateOTPCommand:
@staticmethod
def execute(secret: str, interval: int) -> TOTP:
return TOTP(secret, interval=interval)


create_otp_command = CreateOTPCommand.execute
18 changes: 18 additions & 0 deletions api/custom_auth/mfa/trench/command/create_secret.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Callable

from pyotp import random_base32
from trench.settings import TrenchAPISettings, trench_settings


class CreateSecretCommand:
def __init__(self, generator: Callable, settings: TrenchAPISettings) -> None:
self._generator = generator
self._settings = settings

def execute(self) -> str:
return self._generator(length=self._settings.SECRET_KEY_LENGTH)


create_secret_command = CreateSecretCommand(
generator=random_base32, settings=trench_settings
).execute
34 changes: 34 additions & 0 deletions api/custom_auth/mfa/trench/command/deactivate_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Type

from django.db.transaction import atomic
from trench.exceptions import (
DeactivationOfPrimaryMFAMethodError,
MFANotEnabledError,
)
from trench.models import MFAMethod
from trench.utils import get_mfa_model


class DeactivateMFAMethodCommand:
def __init__(self, mfa_model: Type[MFAMethod]) -> None:
self._mfa_model = mfa_model

@atomic
def execute(self, mfa_method_name: str, user_id: int) -> None:
mfa = self._mfa_model.objects.get_by_name(user_id=user_id, name=mfa_method_name)
number_of_active_mfa_methods = self._mfa_model.objects.filter(
user_id=user_id, is_active=True
).count()
if mfa.is_primary and number_of_active_mfa_methods > 1:
raise DeactivationOfPrimaryMFAMethodError()
if not mfa.is_active:
raise MFANotEnabledError()

self._mfa_model.objects.filter(user_id=user_id, name=mfa_method_name).update(
is_active=False, is_primary=False
)


deactivate_mfa_method_command = DeactivateMFAMethodCommand(
mfa_model=get_mfa_model()
).execute
Loading

0 comments on commit 7e46a2c

Please sign in to comment.