Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(app_analytics): Add cache for feature evaluation #4418

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@
USE_POSTGRES_FOR_ANALYTICS = env.bool("USE_POSTGRES_FOR_ANALYTICS", default=False)
USE_CACHE_FOR_USAGE_DATA = env.bool("USE_CACHE_FOR_USAGE_DATA", default=False)

FEATURE_EVALUATION_CACHE_SECONDS = env.int(
"FEATURE_EVALUATION_CACHE_SECONDS", default=60
)

ENABLE_API_USAGE_TRACKING = env.bool("ENABLE_API_USAGE_TRACKING", default=True)

if ENABLE_API_USAGE_TRACKING:
Expand Down
52 changes: 51 additions & 1 deletion api/app_analytics/cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from app_analytics.tasks import track_request
from app_analytics.tasks import track_feature_evaluation, track_request
from app_analytics.track import track_feature_evaluation_influxdb
from django.conf import settings
from django.utils import timezone

CACHE_FLUSH_INTERVAL = 60 # seconds
Expand Down Expand Up @@ -31,3 +33,51 @@ def track_request(self, resource: int, host: str, environment_key: str):
self._cache[key] += 1
if (timezone.now() - self._last_flushed_at).seconds > CACHE_FLUSH_INTERVAL:
self._flush()


class FeatureEvaluationCache:
def __init__(self):
self._cache = {}
self._last_flushed_at = timezone.now()

def _flush(self):
evaluation_data = {}
for (environment_id, feature_name), eval_count in self._cache.items():
if environment_id in evaluation_data:
evaluation_data[environment_id][feature_name] = eval_count
else:
evaluation_data[environment_id] = {feature_name: eval_count}

for environment_id, feature_evaluations in evaluation_data.items():
if settings.USE_POSTGRES_FOR_ANALYTICS:
track_feature_evaluation.delay(
kwargs={
"environment_id": environment_id,
"feature_evaluations": feature_evaluations,
}
)

elif settings.INFLUXDB_TOKEN:
track_feature_evaluation_influxdb.delay(
kwargs={
"environment_id": environment_id,
"feature_evaluations": feature_evaluations,
}
)

self._cache = {}
self._last_flushed_at = timezone.now()

def track_feature_evaluation(
self, environment_id: int, feature_name: str, evaluation_count: int
):
key = (environment_id, feature_name)
if key not in self._cache:
self._cache[key] = evaluation_count
else:
self._cache[key] += evaluation_count

if (
timezone.now() - self._last_flushed_at
).seconds > settings.FEATURE_EVALUATION_CACHE_SECONDS:
self._flush()
34 changes: 7 additions & 27 deletions api/app_analytics/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,9 @@
get_total_events_count,
get_usage_data,
)
from app_analytics.tasks import (
track_feature_evaluation,
track_feature_evaluation_v2,
)
from app_analytics.track import (
track_feature_evaluation_influxdb,
track_feature_evaluation_influxdb_v2,
)
from app_analytics.cache import FeatureEvaluationCache
from app_analytics.tasks import track_feature_evaluation_v2
from app_analytics.track import track_feature_evaluation_influxdb_v2
from django.conf import settings
from drf_yasg.utils import swagger_auto_schema
from rest_framework import status
Expand All @@ -38,6 +33,7 @@
)

logger = logging.getLogger(__name__)
feature_evaluation_cache = FeatureEvaluationCache()


class SDKAnalyticsFlagsV2(CreateAPIView):
Expand Down Expand Up @@ -141,26 +137,10 @@ def post(self, request, *args, **kwargs):
content_type="application/json",
status=status.HTTP_200_OK,
)

if settings.USE_POSTGRES_FOR_ANALYTICS:
track_feature_evaluation.delay(
args=(
request.environment.id,
request.data,
)
for feature_name, eval_count in self.request.data.items():
feature_evaluation_cache.track_feature_evaluation(
request.environment.id, feature_name, eval_count
)
elif settings.INFLUXDB_TOKEN:
# Due to load issues on the task processor, we
# explicitly run this task in a separate thread.
# TODO: batch influx data to prevent large amounts
# of tasks.
track_feature_evaluation_influxdb.run_in_thread(
args=(
request.environment.id,
request.data,
)
)

return Response(status=status.HTTP_200_OK)

def _is_data_valid(self) -> bool:
Expand Down
96 changes: 95 additions & 1 deletion api/tests/unit/app_analytics/test_unit_app_analytics_cache.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from app_analytics.cache import CACHE_FLUSH_INTERVAL, APIUsageCache
from app_analytics.cache import (
CACHE_FLUSH_INTERVAL,
APIUsageCache,
FeatureEvaluationCache,
)
from app_analytics.models import Resource
from django.utils import timezone
from freezegun import freeze_time
from pytest_django.fixtures import SettingsWrapper
from pytest_mock import MockerFixture


Expand Down Expand Up @@ -71,3 +76,92 @@ def test_api_usage_cache(mocker: MockerFixture) -> None:

# finally, make sure track_request task was not called
assert not mocked_track_request_task.called


def test_feature_evaluation_cache(
mocker: MockerFixture,
settings: SettingsWrapper,
):
# Given
settings.FEATURE_EVALUATION_CACHE_SECONDS = 60
settings.USE_POSTGRES_FOR_ANALYTICS = False
settings.INFLUXDB_TOKEN = "token"

mocked_track_evaluation_task = mocker.patch(
"app_analytics.cache.track_feature_evaluation"
)
mocked_track_feature_evaluation_influxdb_task = mocker.patch(
"app_analytics.cache.track_feature_evaluation_influxdb"
)
environment_1_id = 1
environment_2_id = 2
feature_1_name = "feature_1_name"
feature_2_name = "feature_2_name"

cache = FeatureEvaluationCache()
now = timezone.now()

with freeze_time(now) as frozen_time:
# Track some feature evaluations
for _ in range(10):
cache.track_feature_evaluation(environment_1_id, feature_1_name, 1)
cache.track_feature_evaluation(environment_1_id, feature_2_name, 1)
cache.track_feature_evaluation(environment_2_id, feature_2_name, 1)

# Make sure the internal tasks were not called
assert not mocked_track_evaluation_task.delay.called
assert not mocked_track_feature_evaluation_influxdb_task.delay.called

# Now, let's move the time forward
frozen_time.tick(settings.FEATURE_EVALUATION_CACHE_SECONDS + 1)

# track another evaluation(to trigger cache flush)
cache.track_feature_evaluation(environment_1_id, feature_1_name, 1)

# Then
mocked_track_feature_evaluation_influxdb_task.delay.assert_has_calls(
[
mocker.call(
kwargs={
"environment_id": environment_1_id,
"feature_evaluations": {
feature_1_name: 11,
feature_2_name: 10,
},
},
),
mocker.call(
kwargs={
"environment_id": environment_2_id,
"feature_evaluations": {feature_2_name: 10},
},
),
]
)
# task responsible for tracking evaluation using postgres was not called
assert not mocked_track_evaluation_task.delay.called

# Next, let's enable postgres tracking
settings.USE_POSTGRES_FOR_ANALYTICS = True

# rest the mock
mocked_track_feature_evaluation_influxdb_task.reset_mock()

# Track another evaluation
cache.track_feature_evaluation(environment_1_id, feature_1_name, 1)

# move time forward again
frozen_time.tick(settings.FEATURE_EVALUATION_CACHE_SECONDS + 1)

# track another one(to trigger cache flush)
cache.track_feature_evaluation(environment_1_id, feature_1_name, 1)

# Assert that the call was made with only the data tracked after the flush interval.
mocked_track_evaluation_task.delay.assert_called_once_with(
kwargs={
"environment_id": environment_1_id,
"feature_evaluations": {feature_1_name: 2},
}
)
# and the task for influx was not called
assert not mocked_track_feature_evaluation_influxdb_task.delay.called
50 changes: 8 additions & 42 deletions api/tests/unit/app_analytics/test_unit_app_analytics_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,43 +36,16 @@ def test_sdk_analytics_does_not_allow_bad_data(mocker, settings, environment):

view = SDKAnalyticsFlags(request=request)

mocked_track_feature_eval = mocker.patch(
"app_analytics.views.track_feature_evaluation_influxdb"
mocked_feature_eval_cache = mocker.patch(
"app_analytics.views.feature_evaluation_cache"
)

# When
response = view.post(request)

# Then
assert response.status_code == status.HTTP_200_OK
mocked_track_feature_eval.assert_not_called()


def test_sdk_analytics_allows_valid_data(mocker, settings, environment, feature):
# Given
settings.INFLUXDB_TOKEN = "some-token"

data = {feature.name: 12}
request = mocker.MagicMock(
data=data,
environment=environment,
query_params={},
)

view = SDKAnalyticsFlags(request=request)

mocked_track_feature_eval = mocker.patch(
"app_analytics.views.track_feature_evaluation_influxdb"
)

# When
response = view.post(request)

# Then
assert response.status_code == status.HTTP_200_OK
mocked_track_feature_eval.run_in_thread.assert_called_once_with(
args=(environment.id, data)
)
mocked_feature_eval_cache.track_feature_evaluation.assert_not_called()


def test_get_usage_data(mocker, admin_client, organisation):
Expand Down Expand Up @@ -432,24 +405,20 @@ def test_set_sdk_analytics_flags_without_identifier(
assert feature_evaluation_raw.evaluation_count is feature_request_count


def test_set_sdk_analytics_flags_v1_to_influxdb(
def test_sdk_analytics_flags_v1(
api_client: APIClient,
environment: Environment,
feature: Feature,
identity: Identity,
settings: SettingsWrapper,
mocker: MockerFixture,
) -> None:
# Given
settings.INFLUXDB_TOKEN = "some-token"

url = reverse("api-v1:analytics-flags")
api_client.credentials(HTTP_X_ENVIRONMENT_KEY=environment.api_key)
feature_request_count = 2
data = {feature.name: feature_request_count}

mocked_track_feature_eval = mocker.patch(
"app_analytics.views.track_feature_evaluation_influxdb"
mocked_feature_evaluation_cache = mocker.patch(
"app_analytics.views.feature_evaluation_cache"
)

# When
Expand All @@ -459,9 +428,6 @@ def test_set_sdk_analytics_flags_v1_to_influxdb(

# Then
assert response.status_code == status.HTTP_200_OK
mocked_track_feature_eval.run_in_thread.assert_called_once_with(
args=(
environment.id,
data,
)
mocked_feature_evaluation_cache.track_feature_evaluation.assert_called_once_with(
environment.id, feature.name, feature_request_count
)
Loading