Skip to content

Commit

Permalink
feat(analytics/usage-data): Add ability to store usage data in Postgr…
Browse files Browse the repository at this point in the history
…es (#1849)

* feat(analytics-tracking): add data model

* update datamodel

* Add tasks

* merge migrations

* use different database for analytics data

* Add middleware for storing analytics locally


* update get_from_resource_name

* skip tests if analytics database is not configured

* Add analytics db url

* Add analytics environments variables to processor

* use a different model to register recurring tasks


* Add validation check for overlapping bucket


* Add recurring task to admin

* Add tests for recurring tasks

* Add test for the decorator

* update migrate_analytics_db to migrate only if db exists

* Add comment
  • Loading branch information
gagantrivedi authored Mar 3, 2023
1 parent 627ef8a commit 6b21513
Show file tree
Hide file tree
Showing 45 changed files with 2,114 additions and 109 deletions.
2 changes: 1 addition & 1 deletion .github/actions/api-deploy-ecs/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ runs:
},
"InputTransformer": {
"InputPathsMap":{"project_id":"$.detail.project_id"},
"InputTemplate": "{ \"containerOverrides\": [ { \"name\": \"flagsmith-api-migration\", \"command\": [\"migrate_identities\", <project_id>], \" environment \":[{\"PROJECT_METADATA_TABLE_NAME_DYNAMO\":\"flagsmith_project_metadata\"}]}]}"
"InputTemplate": "{ \"containerOverrides\": [ { \"name\": \"flagsmith-api-migration\", \"command\": [\"migrate-identities\", <project_id>], \" environment \":[{\"PROJECT_METADATA_TABLE_NAME_DYNAMO\":\"flagsmith_project_metadata\"}]}]}"
}
}
]'
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/api-pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,15 @@ jobs:
- name: Check isort imports
run: isort --check .

- name: Create analytics database
env:
PGPASSWORD: postgres
run: createdb -h localhost -U postgres -p 5432 analytics

- name: Run Tests
env:
DATABASE_URL: "postgres://postgres:postgres@localhost:${{ job.services.postgres.ports[5432] }}/postgres"
ANALYTICS_DATABASE_URL: "postgres://postgres:postgres@localhost:${{ job.services.postgres.ports[5432] }}/analytics"
DJANGO_SETTINGS_MODULE: "app.settings.test"
LOG_LEVEL: "INFO"
run: |
Expand Down
42 changes: 40 additions & 2 deletions api/app/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,45 @@ def allow_relation(self, obj1, obj2, **hints):
return None

def allow_migrate(self, db, app_label, model_name=None, **hints):
return db == "default"


class AnalyticsRouter:
route_app_labels = ["app_analytics"]

def db_for_read(self, model, **hints):
"""
All non-auth models end up in this pool.
Attempts to read analytics models go to 'analytics' database.
"""
return db == "default"
if model._meta.app_label in self.route_app_labels:
return "analytics"
return None

def db_for_write(self, model, **hints):
"""
Attempts to write analytics models go to 'analytics' database.
"""
if model._meta.app_label in self.route_app_labels:
return "analytics"
return None

def allow_relation(self, obj1, obj2, **hints):
"""
Relations between objects are allowed if both objects are
in the analytics database.
"""
if (
obj1._meta.app_label in self.route_app_labels
and obj2._meta.app_label in self.route_app_labels
):
return True
return None

def allow_migrate(self, db, app_label, model_name=None, **hints):
"""
Make sure the analytics app only appears in the 'analytics' database
"""
if app_label in self.route_app_labels:
if db != "default":
return db == "analytics"
return None
33 changes: 29 additions & 4 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
ALLOWED_HOSTS = env.list("DJANGO_ALLOWED_HOSTS", default=[])
USE_X_FORWARDED_HOST = env.bool("USE_X_FORWARDED_HOST", default=False)

USE_POSTGRES_FOR_ANALYTICS = env.bool("USE_POSTGRES_FOR_ANALYTICS", default=False)

CSRF_TRUSTED_ORIGINS = env.list("DJANGO_CSRF_TRUSTED_ORIGINS", default=[])

INTERNAL_IPS = ["127.0.0.1"]
Expand Down Expand Up @@ -156,16 +158,16 @@
"task_processor",
"softdelete",
"metadata",
"app_analytics",
]

if GOOGLE_ANALYTICS_KEY or INFLUXDB_TOKEN:
INSTALLED_APPS.append("app_analytics")

SITE_ID = 1

db_conn_max_age = env.int("DJANGO_DB_CONN_MAX_AGE", 60)
DJANGO_DB_CONN_MAX_AGE = None if db_conn_max_age == -1 else db_conn_max_age

DATABASE_ROUTERS = ["app.routers.PrimaryReplicaRouter"]
NUM_DB_REPLICAS = 0
# Allows collectstatic to run without a database, mainly for Docker builds to collectstatic at build time
if "DATABASE_URL" in os.environ:
DATABASES = {
Expand All @@ -182,7 +184,12 @@
DATABASES[f"replica_{i}"] = dj_database_url.parse(
db_url, conn_max_age=DJANGO_DB_CONN_MAX_AGE
)
DATABASE_ROUTERS = ("app.routers.PrimaryReplicaRouter",)

if "ANALYTICS_DATABASE_URL" in os.environ:
DATABASES["analytics"] = dj_database_url.parse(
env("ANALYTICS_DATABASE_URL"), conn_max_age=DJANGO_DB_CONN_MAX_AGE
)
DATABASE_ROUTERS.insert(0, "app.routers.AnalyticsRouter")
elif "DJANGO_DB_NAME" in os.environ:
# If there is no DATABASE_URL configured, check for old style DB config parameters
DATABASES = {
Expand All @@ -196,6 +203,18 @@
"CONN_MAX_AGE": DJANGO_DB_CONN_MAX_AGE,
},
}
if "DJANGO_DB_NAME_ANALYTICS" in os.environ:
DATABASES["analytics"] = {
"ENGINE": "django.db.backends.postgresql",
"NAME": os.environ["DJANGO_DB_NAME_ANALYTICS"],
"USER": os.environ["DJANGO_DB_USER_ANALYTICS"],
"PASSWORD": os.environ["DJANGO_DB_PASSWORD_ANALYTICS"],
"HOST": os.environ["DJANGO_DB_HOST_ANALYTICS"],
"PORT": os.environ["DJANGO_DB_PORT_ANALYTICS"],
"CONN_MAX_AGE": DJANGO_DB_CONN_MAX_AGE,
}

DATABASE_ROUTERS.insert(0, "app.routers.AnalyticsRouter")

LOGIN_THROTTLE_RATE = env("LOGIN_THROTTLE_RATE", "20/min")
SIGNUP_THROTTLE_RATE = env("SIGNUP_THROTTLE_RATE", "10000/min")
Expand Down Expand Up @@ -263,6 +282,12 @@
if INFLUXDB_TOKEN:
MIDDLEWARE.append("app_analytics.middleware.InfluxDBMiddleware")

if USE_POSTGRES_FOR_ANALYTICS:
if INFLUXDB_TOKEN:
raise RuntimeError("Cannot use both InfluxDB and Postgres for analytics")

MIDDLEWARE.append("app_analytics.middleware.APIUsageMiddleware")

ALLOWED_ADMIN_IP_ADDRESSES = env.list("ALLOWED_ADMIN_IP_ADDRESSES", default=list())
if len(ALLOWED_ADMIN_IP_ADDRESSES) > 0:
warnings.warn(
Expand Down
133 changes: 133 additions & 0 deletions api/app_analytics/analytics_db_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from datetime import date, timedelta
from typing import List

from app_analytics.dataclasses import FeatureEvaluationData, UsageData
from app_analytics.influxdb_wrapper import get_events_for_organisation
from app_analytics.influxdb_wrapper import (
get_feature_evaluation_data as get_feature_evaluation_data_from_influxdb,
)
from app_analytics.influxdb_wrapper import (
get_usage_data as get_usage_data_from_influxdb,
)
from app_analytics.models import (
APIUsageBucket,
FeatureEvaluationBucket,
Resource,
)
from django.conf import settings
from django.db.models import Sum
from django.utils import timezone

from environments.models import Environment
from features.models import Feature

ANALYTICS_READ_BUCKET_SIZE = 15


def get_usage_data(
organisation, environment_id=None, project_id=None
) -> List[UsageData]:
if settings.USE_POSTGRES_FOR_ANALYTICS:
return get_usage_data_from_local_db(
organisation, environment_id=environment_id, project_id=project_id
)
return get_usage_data_from_influxdb(
organisation, environment_id=environment_id, project_id=project_id
)


def get_usage_data_from_local_db(
organisation, environment_id=None, project_id=None, period: int = 30
) -> List[UsageData]:
qs = APIUsageBucket.objects.filter(
environment_id__in=_get_environment_ids_for_org(organisation),
bucket_size=ANALYTICS_READ_BUCKET_SIZE,
)
if project_id:
qs = qs.filter(project_id=project_id)
if environment_id:
qs = qs.filter(environment_id=environment_id)

qs = (
qs.filter(
created_at__date__lte=timezone.now(),
created_at__date__gt=timezone.now() - timedelta(days=30),
)
.order_by("created_at")
.values("created_at__date", "resource")
.annotate(count=Sum("total_count"))
)
data_by_day = {}
for row in qs:
day = row["created_at__date"]
if day not in data_by_day:
data_by_day[day] = UsageData(day=day)
setattr(
data_by_day[day],
Resource.get_lowercased_name(row["resource"]),
row["count"],
)

return data_by_day.values()


def get_total_events_count(organisation) -> int:
"""
Return total number of events for an organisation in the last 30 days
"""
if settings.USE_POSTGRES_FOR_ANALYTICS:
count = APIUsageBucket.objects.filter(
environment_id__in=_get_environment_ids_for_org(organisation),
created_at__date__lte=date.today(),
created_at__date__gt=date.today() - timedelta(days=30),
bucket_size=ANALYTICS_READ_BUCKET_SIZE,
).aggregate(total_count=Sum("total_count"))["total_count"]
else:
count = get_events_for_organisation(organisation.id)
return count


def get_feature_evaluation_data(
feature: Feature, environment_id: int, period: int = 30
) -> List[FeatureEvaluationData]:
if settings.USE_POSTGRES_FOR_ANALYTICS:
return get_feature_evaluation_data_from_local_db(
feature, environment_id, period
)
return get_feature_evaluation_data_from_influxdb(
feature_name=feature.name, environment_id=environment_id, period=f"{period}d"
)


def get_feature_evaluation_data_from_local_db(
feature: Feature, environment_id: int, period: int = 30
) -> List[FeatureEvaluationData]:
feature_evaluation_data = (
FeatureEvaluationBucket.objects.filter(
environment_id=environment_id,
bucket_size=ANALYTICS_READ_BUCKET_SIZE,
created_at__date__lte=timezone.now(),
created_at__date__gt=timezone.now() - timedelta(days=period),
)
.order_by("created_at")
.values("created_at__date", "feature_name", "environment_id")
.annotate(count=Sum("total_count"))
)
usage_list = []
for data in feature_evaluation_data:
usage_list.append(
FeatureEvaluationData(
day=data["created_at__date"],
count=data["count"],
)
)
return usage_list


def _get_environment_ids_for_org(organisation) -> List[int]:
# We need to do this to prevent Django from generating a query that
# references the environments and projects tables,
# as they do not exist in the analytics database.
return [
e.id for e in Environment.objects.filter(project__organisation=organisation)
]
6 changes: 6 additions & 0 deletions api/app_analytics/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class AppAnalyticsConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "app_analytics"
17 changes: 17 additions & 0 deletions api/app_analytics/dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dataclasses import dataclass
from datetime import date


@dataclass
class UsageData:
day: date
flags: int = 0
traits: int = 0
identities: int = 0
environment_document: int = 0


@dataclass
class FeatureEvaluationData:
day: date
count: int = 0
45 changes: 45 additions & 0 deletions api/app_analytics/influxdb_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from marshmallow import EXCLUDE, Schema, fields, post_load, pre_load

from .dataclasses import FeatureEvaluationData, UsageData


class FeatureEvaluationDataSchema(Schema):
count = fields.Integer(allow_none=True)
day = fields.Date(allow_none=True)

class Meta:
unknown = EXCLUDE

@pre_load
def preprocess(self, data, **kwargs):
# the data returned by influx db looks like this:
# {
# "datetime": "2021-01-01",
# "some_feature_name": 10
# }
day = data.pop("datetime")
# Use popitem because we don't know the name of the feature
# and it's the only item left in the dict
_, count = data.popitem()
return {"day": day, "count": count}

@post_load
def make_usage_data(self, data, **kwargs):
return FeatureEvaluationData(**data)


class UsageDataSchema(Schema):
flags = fields.Integer(data_key="Flags", allow_none=True)
traits = fields.Integer(data_key="Traits", allow_none=True)
identities = fields.Integer(data_key="Identities", allow_none=True)
environment_document = fields.Integer(
data_key="Environment-document", allow_none=True
)
day = fields.Date(data_key="name", allow_none=True)

class Meta:
unknown = EXCLUDE

@post_load
def make_usage_data(self, data, **kwargs):
return UsageData(**data)
Loading

0 comments on commit 6b21513

Please sign in to comment.