Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sse): track usage #3050

Merged
merged 18 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/actions/api-deploy-ecs/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ inputs:
description: The flagsmith-rbac git revision to use when building deployment package.
required: false
default: main
sse_pgp_private_key:
description: Private PGP key used for encrypting/decrypting access logs
required: true

outputs:
api_ecr_image_url:
description: The image URL from ECR
Expand Down Expand Up @@ -147,9 +151,11 @@ runs:
ECR_REPOSITORY: ${{ inputs.aws_ecr_repository_arn }}
DOCKER_BUILDKIT: '1'
run: |
echo "Load pgp private key"
cat <<< ${{ input.sse_pgp_private_key }} > sse_pgp_pkey
echo "Building docker image with URL: "
echo $ECR_REPOSITORY:$IMAGE_TAG
docker build -t $ECR_REPOSITORY:$IMAGE_TAG -f api/Dockerfile --build-arg SAML_INSTALLED=1 --build-arg POETRY_OPTS="--with saml,auth-controller" .
docker build --secret id=sse_pgp_pkey,src=./sse_pgp_pkey -t $ECR_REPOSITORY:$IMAGE_TAG -f api/Dockerfile --build-arg SAML_INSTALLED=1 --build-arg POETRY_OPTS="--with saml,auth-controller" .
docker push $ECR_REPOSITORY:$IMAGE_TAG
echo "image=$ECR_REPOSITORY:$IMAGE_TAG" >> $GITHUB_OUTPUT
shell: bash
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/api-deploy-production-ecs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ jobs:
aws_ecs_service_name: flagsmith-task-processor-svc-eu-west-2-bf77140
aws_task_definitions_directory_path: infrastructure/aws/production
api_ecr_image_url: ${{ steps.deploy-api.outputs.api_ecr_image_url }}
sse_pgp_private_key: ${{ secrets.SSE_PGP_PRIVATE_KEY }}

run-tests:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/api-deploy-staging-ecs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ jobs:
aws_ecs_service_name: flagsmith-task-processor-svc-eu-west-2-792c644
aws_task_definitions_directory_path: infrastructure/aws/staging
api_ecr_image_url: ${{ steps.deploy-api.outputs.api_ecr_image_url }}
sse_pgp_private_key: ${{ secrets.SSE_PGP_PRIVATE_KEY }}

run-tests:
runs-on: ubuntu-latest
Expand Down
7 changes: 7 additions & 0 deletions api/.env-ci
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres
ANALYTICS_DATABASE_URL=postgres://postgres:postgres@localhost:5432/analytics
PYTEST_ADDOPTS=--cov . --cov-report xml -n auto --dist worksteal

# used by moto # ref https://github.com/getmoto/moto/issues/5941
AWS_ACCESS_KEY_ID=testing
AWS_SECRET_ACCESS_KEY=testing
AWS_SECURITY_TOKEN=testing
AWS_SESSION_TOKEN=testing
AWS_DEFAULT_REGION=eu-east-2
5 changes: 5 additions & 0 deletions api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ RUN if [ "${SAML_INSTALLED}" = "1" ]; then apt-get update && apt-get install -y
ARG TARGETARCH
RUN if [ "${TARGETARCH}" != "amd64" ]; then apt-get update && apt-get install -y libpq-dev && rm -rf /var/lib/apt/lists/*; fi;

# Install GnuPG(and import private key) if secret file exists
RUN --mount=type=secret,id=pgp_pkey if [ -f /run/secrets/sse_pgp_pkey ]; then \
apt-get update && apt-get install -y gnupg && gpg --import /run/secrets/sse_pgp_pkey; fi;


# Copy the python venv from step 2
COPY --from=build-python /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
# Copy the bin folder as well to copy the executables created in package installation
Expand Down
3 changes: 3 additions & 0 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,9 @@
# Real time(server sent events) settings
SSE_SERVER_BASE_URL = env.str("SSE_SERVER_BASE_URL", None)
SSE_AUTHENTICATION_TOKEN = env.str("SSE_AUTHENTICATION_TOKEN", None)
AWS_SSE_LOGS_BUCKET_NAME = env.str("AWS_SSE_LOGS_BUCKET_NAME", None)
SSE_INFLUXDB_BUCKET = env.str("SSE_INFLUXDB_BUCKET", None)


DISABLE_INVITE_LINKS = env.bool("DISABLE_INVITE_LINKS", False)

Expand Down
2 changes: 2 additions & 0 deletions api/app/settings/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@
"signup": "100/min",
"user": "100000/day",
}

AWS_SSE_LOGS_BUCKET_NAME = "test_bucket"
27 changes: 24 additions & 3 deletions api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ django-axes = "~5.32.0"
pydantic = "~1.10.9"
pyngo = "~1.6.0"
flagsmith = "^3.4.0"
python-gnupg = "^0.5.1"

[tool.poetry.group.auth-controller]
optional = true
Expand Down
7 changes: 7 additions & 0 deletions api/sse/dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dataclasses import dataclass


@dataclass(eq=True)
class SSEAccessLogs:
generated_at: str # ISO 8601
api_key: str
25 changes: 24 additions & 1 deletion api/sse/sse_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import csv
from functools import wraps
from io import StringIO
from typing import Generator

import boto3
import gnupg
from django.conf import settings

from . import tasks
from sse import tasks
from sse.dataclasses import SSEAccessLogs

s3 = boto3.resource("s3")


def _sse_enabled(get_project_from_first_arg=lambda obj: obj.project):
Expand Down Expand Up @@ -43,3 +51,18 @@ def send_environment_update_message_for_environment(environment):
tasks.send_environment_update_message.delay(
args=(environment.api_key, environment.updated_at.isoformat())
)


def stream_access_logs() -> Generator[SSEAccessLogs, None, None]:
gpg = gnupg.GPG()
bucket = s3.Bucket(settings.AWS_SSE_LOGS_BUCKET_NAME)
for log_file in bucket.objects.all():
encrypted_body = log_file.get()["Body"].read()
decrypted_body = gpg.decrypt(encrypted_body)

reader = csv.reader(StringIO(decrypted_body.data.decode()))

for row in reader:
yield SSEAccessLogs(*row)

log_file.delete()
58 changes: 57 additions & 1 deletion api/sse/tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
import logging
from datetime import timedelta

import requests
from app_analytics.influxdb_wrapper import influxdb_client
from django.conf import settings
from influxdb_client import Point, WriteOptions

from environments.models import Environment
from projects.models import Project
from task_processor.decorators import register_task_handler
from sse import sse_service
from task_processor.decorators import (
register_recurring_task,
register_task_handler,
)

from .exceptions import SSEAuthTokenNotSet

logger = logging.getLogger(__name__)


@register_task_handler()
def send_environment_update_message_for_project(
Expand All @@ -27,6 +39,50 @@ def send_environment_update_message(environment_key: str, updated_at):
response.raise_for_status()


if settings.AWS_SSE_LOGS_BUCKET_NAME:

@register_recurring_task(
run_every=timedelta(minutes=5),
)
def update_sse_usage():
agg_request_count: dict[str, int] = {}
agg_last_event_generated_at: dict[str, str] = {}

for log in sse_service.stream_access_logs():
agg_request_count[log.api_key] = agg_request_count.get(log.api_key, 0) + 1
agg_last_event_generated_at[log.api_key] = log.generated_at

with influxdb_client.write_api(
write_options=WriteOptions(batch_size=100, flush_interval=1000)
) as write_api:
environments = Environment.objects.filter(
api_key__in=agg_request_count.keys()
).values("api_key", "id", "project_id", "project__organisation_id")

for environment in environments:
record = _get_influx_point(
environment["id"],
environment["project_id"],
environment["project__organisation_id"],
agg_request_count[environment["api_key"]],
agg_last_event_generated_at[environment["api_key"]],
)
write_api.write(bucket=settings.SSE_INFLUXDB_BUCKET, record=record)


def _get_influx_point(
environment_id: int, project_id: int, organisation_id: int, count: int, time: str
) -> Point:
return (
Point("sse_call")
.field("request_count", count)
.tag("organisation_id", organisation_id)
.tag("project_id", project_id)
.tag("environment_id", environment_id)
.time(time)
)


def get_auth_header():
if not settings.SSE_AUTHENTICATION_TOKEN:
raise SSEAuthTokenNotSet()
Expand Down
69 changes: 69 additions & 0 deletions api/tests/unit/sse/test_sse_service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import boto3
import pytest
from django.conf import settings
from moto import mock_s3
from moto.core import patch_resource
from pytest_lazyfixture import lazy_fixture
from pytest_mock import MockerFixture

from sse.dataclasses import SSEAccessLogs
from sse.sse_service import (
s3,
send_environment_update_message_for_environment,
send_environment_update_message_for_project,
stream_access_logs,
)


Expand Down Expand Up @@ -96,3 +104,64 @@ def test_send_environment_update_message_for_environment_schedules_task_correctl
realtime_enabled_project_environment_one.updated_at.isoformat(),
)
)


@mock_s3
def test_stream_access_logs(mocker: MockerFixture):
# Given - Some test data
first_log = SSEAccessLogs("2023-11-27T06:42:47+0000", "key_one")
second_log = SSEAccessLogs("2023-11-27T06:42:47+0000", "key_two")
third_log = SSEAccessLogs("2023-11-27T06:42:47+0000", "key_three")

first_encrypted_object_data = b"first_bucket_encrypted_data"
first_decrypted_object_data = (
f"{first_log.generated_at},{first_log.api_key}\n"
f"{second_log.generated_at},{second_log.api_key}".encode()
)
second_encrypted_object_data = b"second_bucket_encrypted_data"
second_decrypted_object_data = (
f"{third_log.generated_at},{third_log.api_key}".encode()
)

# patch the s3 resource because it was created before the mock_s3 decorator was applied
# ref: https://docs.getmoto.org/en/latest/docs/getting_started.html#patching-the-client-or-resource
patch_resource(s3)

# Next, let's create a bucket
bucket_name = settings.AWS_SSE_LOGS_BUCKET_NAME
s3_client = boto3.client("s3", region_name="eu-west-2")
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": "eu-west-2"},
)
# put some objects
s3_client.put_object(
Body=first_encrypted_object_data, Bucket=bucket_name, Key="first_object"
)
s3_client.put_object(
Body=second_encrypted_object_data, Bucket=bucket_name, Key="second_object"
)

mocked_gpg = mocker.patch("sse.sse_service.gnupg.GPG", autospec=True)

mocked_gpg.return_value.decrypt.side_effect = [
mocker.MagicMock(data=first_decrypted_object_data),
mocker.MagicMock(data=second_decrypted_object_data),
]

# When
access_logs = list(stream_access_logs())

# Then
assert access_logs == [first_log, second_log, third_log]

# gpg decrypt was called correctly
mocked_gpg.return_value.decrypt.assert_has_calls(
[
mocker.call(first_encrypted_object_data),
mocker.call(second_encrypted_object_data),
]
)

# And, bucket is now empty
assert "Contents" not in s3_client.list_objects(Bucket=bucket_name)
Loading