Skip to content

Commit

Permalink
feat(sse): track usage (#3050)
Browse files Browse the repository at this point in the history
  • Loading branch information
gagantrivedi authored Dec 6, 2023
1 parent 3f100d2 commit 9502e55
Show file tree
Hide file tree
Showing 16 changed files with 295 additions and 6 deletions.
8 changes: 7 additions & 1 deletion .github/actions/api-deploy-ecs/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ inputs:
description: The flagsmith-rbac git revision to use when building deployment package.
required: false
default: main
sse_pgp_private_key:
description: Private PGP key used for encrypting/decrypting access logs
required: true

outputs:
api_ecr_image_url:
description: The image URL from ECR
Expand Down Expand Up @@ -147,9 +151,11 @@ runs:
ECR_REPOSITORY: ${{ inputs.aws_ecr_repository_arn }}
DOCKER_BUILDKIT: '1'
run: |
echo "Load pgp private key"
cat <<< ${{ input.sse_pgp_private_key }} > sse_pgp_pkey
echo "Building docker image with URL: "
echo $ECR_REPOSITORY:$IMAGE_TAG
docker build -t $ECR_REPOSITORY:$IMAGE_TAG -f api/Dockerfile --build-arg SAML_INSTALLED=1 --build-arg POETRY_OPTS="--with saml,auth-controller" .
docker build --secret id=sse_pgp_pkey,src=./sse_pgp_pkey -t $ECR_REPOSITORY:$IMAGE_TAG -f api/Dockerfile --build-arg SAML_INSTALLED=1 --build-arg POETRY_OPTS="--with saml,auth-controller" .
docker push $ECR_REPOSITORY:$IMAGE_TAG
echo "image=$ECR_REPOSITORY:$IMAGE_TAG" >> $GITHUB_OUTPUT
shell: bash
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/api-deploy-production-ecs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ jobs:
aws_ecs_service_name: flagsmith-task-processor-svc-eu-west-2-bf77140
aws_task_definitions_directory_path: infrastructure/aws/production
api_ecr_image_url: ${{ steps.deploy-api.outputs.api_ecr_image_url }}
sse_pgp_private_key: ${{ secrets.SSE_PGP_PRIVATE_KEY }}

run-tests:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/api-deploy-staging-ecs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ jobs:
aws_ecs_service_name: flagsmith-task-processor-svc-eu-west-2-792c644
aws_task_definitions_directory_path: infrastructure/aws/staging
api_ecr_image_url: ${{ steps.deploy-api.outputs.api_ecr_image_url }}
sse_pgp_private_key: ${{ secrets.SSE_PGP_PRIVATE_KEY }}

run-tests:
runs-on: ubuntu-latest
Expand Down
7 changes: 7 additions & 0 deletions api/.env-ci
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres
ANALYTICS_DATABASE_URL=postgres://postgres:postgres@localhost:5432/analytics
PYTEST_ADDOPTS=--cov . --cov-report xml -n auto --dist worksteal

# used by moto # ref https://github.com/getmoto/moto/issues/5941
AWS_ACCESS_KEY_ID=testing
AWS_SECRET_ACCESS_KEY=testing
AWS_SECURITY_TOKEN=testing
AWS_SESSION_TOKEN=testing
AWS_DEFAULT_REGION=eu-east-2
5 changes: 5 additions & 0 deletions api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ RUN if [ "${SAML_INSTALLED}" = "1" ]; then apt-get update && apt-get install -y
ARG TARGETARCH
RUN if [ "${TARGETARCH}" != "amd64" ]; then apt-get update && apt-get install -y libpq-dev && rm -rf /var/lib/apt/lists/*; fi;

# Install GnuPG(and import private key) if secret file exists
RUN --mount=type=secret,id=pgp_pkey if [ -f /run/secrets/sse_pgp_pkey ]; then \
apt-get update && apt-get install -y gnupg && gpg --import /run/secrets/sse_pgp_pkey; fi;


# Copy the python venv from step 2
COPY --from=build-python /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
# Copy the bin folder as well to copy the executables created in package installation
Expand Down
3 changes: 3 additions & 0 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,9 @@
# Real time(server sent events) settings
SSE_SERVER_BASE_URL = env.str("SSE_SERVER_BASE_URL", None)
SSE_AUTHENTICATION_TOKEN = env.str("SSE_AUTHENTICATION_TOKEN", None)
AWS_SSE_LOGS_BUCKET_NAME = env.str("AWS_SSE_LOGS_BUCKET_NAME", None)
SSE_INFLUXDB_BUCKET = env.str("SSE_INFLUXDB_BUCKET", None)


DISABLE_INVITE_LINKS = env.bool("DISABLE_INVITE_LINKS", False)

Expand Down
2 changes: 2 additions & 0 deletions api/app/settings/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@
"signup": "100/min",
"user": "100000/day",
}

AWS_SSE_LOGS_BUCKET_NAME = "test_bucket"
27 changes: 24 additions & 3 deletions api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ django-axes = "~5.32.0"
pydantic = "~1.10.9"
pyngo = "~1.6.0"
flagsmith = "^3.4.0"
python-gnupg = "^0.5.1"

[tool.poetry.group.auth-controller]
optional = true
Expand Down
7 changes: 7 additions & 0 deletions api/sse/dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dataclasses import dataclass


@dataclass(eq=True)
class SSEAccessLogs:
generated_at: str # ISO 8601
api_key: str
25 changes: 24 additions & 1 deletion api/sse/sse_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import csv
from functools import wraps
from io import StringIO
from typing import Generator

import boto3
import gnupg
from django.conf import settings

from . import tasks
from sse import tasks
from sse.dataclasses import SSEAccessLogs

s3 = boto3.resource("s3")


def _sse_enabled(get_project_from_first_arg=lambda obj: obj.project):
Expand Down Expand Up @@ -43,3 +51,18 @@ def send_environment_update_message_for_environment(environment):
tasks.send_environment_update_message.delay(
args=(environment.api_key, environment.updated_at.isoformat())
)


def stream_access_logs() -> Generator[SSEAccessLogs, None, None]:
gpg = gnupg.GPG()
bucket = s3.Bucket(settings.AWS_SSE_LOGS_BUCKET_NAME)
for log_file in bucket.objects.all():
encrypted_body = log_file.get()["Body"].read()
decrypted_body = gpg.decrypt(encrypted_body)

reader = csv.reader(StringIO(decrypted_body.data.decode()))

for row in reader:
yield SSEAccessLogs(*row)

log_file.delete()
58 changes: 57 additions & 1 deletion api/sse/tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
import logging
from datetime import timedelta

import requests
from app_analytics.influxdb_wrapper import influxdb_client
from django.conf import settings
from influxdb_client import Point, WriteOptions

from environments.models import Environment
from projects.models import Project
from task_processor.decorators import register_task_handler
from sse import sse_service
from task_processor.decorators import (
register_recurring_task,
register_task_handler,
)

from .exceptions import SSEAuthTokenNotSet

logger = logging.getLogger(__name__)


@register_task_handler()
def send_environment_update_message_for_project(
Expand All @@ -27,6 +39,50 @@ def send_environment_update_message(environment_key: str, updated_at):
response.raise_for_status()


if settings.AWS_SSE_LOGS_BUCKET_NAME:

@register_recurring_task(
run_every=timedelta(minutes=5),
)
def update_sse_usage():
agg_request_count: dict[str, int] = {}
agg_last_event_generated_at: dict[str, str] = {}

for log in sse_service.stream_access_logs():
agg_request_count[log.api_key] = agg_request_count.get(log.api_key, 0) + 1
agg_last_event_generated_at[log.api_key] = log.generated_at

with influxdb_client.write_api(
write_options=WriteOptions(batch_size=100, flush_interval=1000)
) as write_api:
environments = Environment.objects.filter(
api_key__in=agg_request_count.keys()
).values("api_key", "id", "project_id", "project__organisation_id")

for environment in environments:
record = _get_influx_point(
environment["id"],
environment["project_id"],
environment["project__organisation_id"],
agg_request_count[environment["api_key"]],
agg_last_event_generated_at[environment["api_key"]],
)
write_api.write(bucket=settings.SSE_INFLUXDB_BUCKET, record=record)


def _get_influx_point(
environment_id: int, project_id: int, organisation_id: int, count: int, time: str
) -> Point:
return (
Point("sse_call")
.field("request_count", count)
.tag("organisation_id", organisation_id)
.tag("project_id", project_id)
.tag("environment_id", environment_id)
.time(time)
)


def get_auth_header():
if not settings.SSE_AUTHENTICATION_TOKEN:
raise SSEAuthTokenNotSet()
Expand Down
69 changes: 69 additions & 0 deletions api/tests/unit/sse/test_sse_service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import boto3
import pytest
from django.conf import settings
from moto import mock_s3
from moto.core import patch_resource
from pytest_lazyfixture import lazy_fixture
from pytest_mock import MockerFixture

from sse.dataclasses import SSEAccessLogs
from sse.sse_service import (
s3,
send_environment_update_message_for_environment,
send_environment_update_message_for_project,
stream_access_logs,
)


Expand Down Expand Up @@ -96,3 +104,64 @@ def test_send_environment_update_message_for_environment_schedules_task_correctl
realtime_enabled_project_environment_one.updated_at.isoformat(),
)
)


@mock_s3
def test_stream_access_logs(mocker: MockerFixture):
# Given - Some test data
first_log = SSEAccessLogs("2023-11-27T06:42:47+0000", "key_one")
second_log = SSEAccessLogs("2023-11-27T06:42:47+0000", "key_two")
third_log = SSEAccessLogs("2023-11-27T06:42:47+0000", "key_three")

first_encrypted_object_data = b"first_bucket_encrypted_data"
first_decrypted_object_data = (
f"{first_log.generated_at},{first_log.api_key}\n"
f"{second_log.generated_at},{second_log.api_key}".encode()
)
second_encrypted_object_data = b"second_bucket_encrypted_data"
second_decrypted_object_data = (
f"{third_log.generated_at},{third_log.api_key}".encode()
)

# patch the s3 resource because it was created before the mock_s3 decorator was applied
# ref: https://docs.getmoto.org/en/latest/docs/getting_started.html#patching-the-client-or-resource
patch_resource(s3)

# Next, let's create a bucket
bucket_name = settings.AWS_SSE_LOGS_BUCKET_NAME
s3_client = boto3.client("s3", region_name="eu-west-2")
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
)
# put some objects
s3_client.put_object(
Body=first_encrypted_object_data, Bucket=bucket_name, Key="first_object"
)
s3_client.put_object(
Body=second_encrypted_object_data, Bucket=bucket_name, Key="second_object"
)

mocked_gpg = mocker.patch("sse.sse_service.gnupg.GPG", autospec=True)

mocked_gpg.return_value.decrypt.side_effect = [
mocker.MagicMock(data=first_decrypted_object_data),
mocker.MagicMock(data=second_decrypted_object_data),
]

# When
access_logs = list(stream_access_logs())

# Then
assert access_logs == [first_log, second_log, third_log]

# gpg decrypt was called correctly
mocked_gpg.return_value.decrypt.assert_has_calls(
[
mocker.call(first_encrypted_object_data),
mocker.call(second_encrypted_object_data),
]
)

# And, bucket is now empty
assert "Contents" not in s3_client.list_objects(Bucket=bucket_name)
Loading

0 comments on commit 9502e55

Please sign in to comment.