Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Missing permissions for integration API endpoints #4530

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 44 additions & 52 deletions api/integrations/common/views.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,76 @@
from django.db.models import QuerySet
from django.shortcuts import get_object_or_404
from rest_framework import viewsets
from rest_framework.exceptions import (
NotFound,
PermissionDenied,
ValidationError,
)
from rest_framework.exceptions import ValidationError
from rest_framework.permissions import BasePermission, IsAuthenticated
from rest_framework.request import Request
from rest_framework.serializers import BaseSerializer

from environments.models import Environment
from environments.permissions.constants import VIEW_ENVIRONMENT
from projects.permissions import VIEW_PROJECT
from environments.permissions.permissions import NestedEnvironmentPermissions
from projects.permissions import VIEW_PROJECT, NestedProjectPermissions


class EnvironmentIntegrationCommonViewSet(viewsets.ModelViewSet):
serializer_class = None
pagination_class = None # set here to ensure documentation is correct
model_class = None

def get_queryset(self):
def initial(self, request: Request, *args, **kwargs) -> None:
super().initial(request, *args, **kwargs)
request.environment = get_object_or_404(
Environment,
api_key=self.kwargs["environment_api_key"],
)

def get_queryset(self) -> QuerySet:
if getattr(self, "swagger_fake_view", False):
return self.model_class.objects.none()

environment_api_key = self.kwargs["environment_api_key"]

try:
environment = Environment.objects.get(api_key=environment_api_key)
if not self.request.user.has_environment_permission(
VIEW_ENVIRONMENT, environment
):
raise PermissionDenied(
"User does not have permission to perform action in environment."
)
return self.model_class.objects.filter(environment=self.request.environment)

return self.model_class.objects.filter(environment=environment)
except Environment.DoesNotExist:
raise NotFound("Environment not found.")
def get_permissions(self) -> list[BasePermission]:
return [
IsAuthenticated(),
NestedEnvironmentPermissions(
action_permission_map={"retrieve": VIEW_ENVIRONMENT},
),
]

def perform_create(self, serializer):
environment = self.get_environment_from_request()

if self.model_class.objects.filter(environment=environment).exists():
def perform_create(self, serializer: BaseSerializer) -> None:
if self.get_queryset().exists():
raise ValidationError(
f"{self.model_class.__name__} for environment already exist."
"This integration already exists for this environment."
)
serializer.save(environment=self.request.environment)

serializer.save(environment=environment)

def perform_update(self, serializer):
environment = self.get_environment_from_request()
serializer.save(environment=environment)

def get_environment_from_request(self):
"""
Get environment object from URL parameters in request.
"""
return Environment.objects.get(api_key=self.kwargs["environment_api_key"])
def perform_update(self, serializer: BaseSerializer) -> None:
serializer.save(environment=self.request.environment)


class ProjectIntegrationBaseViewSet(viewsets.ModelViewSet):
serializer_class = None
pagination_class = None
model_class = None

def get_queryset(self):
def get_queryset(self) -> QuerySet:
if getattr(self, "swagger_fake_view", False):
return self.model_class.objects.none()

project = get_object_or_404(
self.request.user.get_permitted_projects(VIEW_PROJECT),
pk=self.kwargs["project_pk"],
)
return self.model_class.objects.filter(project=project)
return self.model_class.objects.filter(project_id=self.kwargs["project_pk"])

def perform_create(self, serializer):
project_id = self.kwargs["project_pk"]
if self.model_class.objects.filter(project_id=project_id).exists():
raise ValidationError(
f"{self.model_class.__name__} for this project already exists."
)
serializer.save(project_id=project_id)
def get_permissions(self) -> list[BasePermission]:
return [
NestedProjectPermissions(
action_permission_map={"retrieve": VIEW_PROJECT},
),
]

def perform_create(self, serializer: BaseSerializer) -> None:
if self.get_queryset().exists():
raise ValidationError("This integration already exists for this project.")
serializer.save(project_id=self.kwargs["project_pk"])

def perform_update(self, serializer):
project_id = self.kwargs["project_pk"]
serializer.save(project_id=project_id)
def perform_update(self, serializer: BaseSerializer) -> None:
serializer.save(project_id=self.kwargs["project_pk"])
17 changes: 13 additions & 4 deletions api/integrations/slack/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from django.shortcuts import redirect
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.permissions import BasePermission
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
from slack_sdk.oauth import AuthorizeUrlGenerator
Expand Down Expand Up @@ -63,26 +64,35 @@ class SlackEnvironmentViewSet(EnvironmentIntegrationCommonViewSet):
pagination_class = None # set here to ensure documentation is correct
model_class = SlackEnvironment

def get_permissions(self) -> list[BasePermission]:
if (action := self.action) in [
"slack_oauth_callback",
"get_temporary_signature",
]:
return []
if action == "slack_oauth_init":
return [OauthInitPermission()]
return super().get_permissions()

@action(detail=False, methods=["GET"], url_path="signature")
def get_temporary_signature(self, request, *args, **kwargs):
return Response({"signature": signer.sign(request.user.id)})

@action(detail=False, methods=["GET"], url_path="callback", permission_classes=[])
@action(detail=False, methods=["GET"], url_path="callback")
def slack_oauth_callback(self, request, environment_api_key):
code = request.GET.get("code")
if not code:
return Response(
"code not found in query params",
status.HTTP_400_BAD_REQUEST,
)
env = self.get_environment_from_request()
validate_state(request.GET.get("state"), request)
bot_token = SlackWrapper().get_bot_token(
code, self._get_slack_callback_url(environment_api_key)
)

SlackConfiguration.objects.update_or_create(
project=env.project, defaults={"api_token": bot_token}
project=request.environment.project, defaults={"api_token": bot_token}
)
return redirect(self._get_front_end_redirect_url())

Expand All @@ -91,7 +101,6 @@ def slack_oauth_callback(self, request, environment_api_key):
methods=["GET"],
url_path="oauth",
authentication_classes=[OauthInitAuthentication],
permission_classes=[OauthInitPermission],
)
def slack_oauth_init(self, request, environment_api_key):
if not settings.SLACK_CLIENT_ID:
Expand Down
3 changes: 3 additions & 0 deletions api/projects/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ def has_permission(self, request, view):
self.action_permission_map[view.action], project
)

if view.action == "create":
return request.user.is_project_admin(project)

return view.detail

def has_object_permission(self, request, view, obj):
Expand Down
24 changes: 24 additions & 0 deletions api/tests/unit/integrations/datadog/test_unit_datadog_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,27 @@ def test_create_datadog_configuration_in_project_with_deleted_configuration(
response_json = response.json()
assert response_json["api_key"] == api_key
assert response_json["base_url"] == base_url


def test_datadog_project_view__no_permissions__return_expected(
test_user_client: APIClient,
project: Project,
) -> None:
# Given
data = {
"base_url": "http://test.com",
"api_key": "abc-123",
"use_custom_source": True,
}
url = reverse("api-v1:projects:integrations-datadog-list", args=[project.id])

# When
response = test_user_client.post(
url,
data=json.dumps(data),
content_type="application/json",
)

# Then
assert response.status_code == status.HTTP_403_FORBIDDEN
assert not DataDogConfiguration.objects.filter(project=project).exists()
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,30 @@ def test_should_remove_configuration_when_delete(
# Then
assert response.status_code == status.HTTP_204_NO_CONTENT
assert not DynatraceConfiguration.objects.filter(environment=environment).exists()


def test_dynatrace_environment_view__no_permissions__return_expected(
test_user_client: APIClient,
environment: Environment,
) -> None:
# Given
data = {
"base_url": "http://test.com",
"api_key": "abc-123",
"entity_selector": "type(APPLICATION),entityName(docs)",
}
url = reverse(
"api-v1:environments:integrations-dynatrace-list",
args=[environment.api_key],
)

# When
response = test_user_client.post(
url,
data=json.dumps(data),
content_type="application/json",
)

# Then
assert response.status_code == status.HTTP_403_FORBIDDEN
assert not DynatraceConfiguration.objects.filter(environment=environment).exists()
Loading