Skip to content

Commit fe83bfd

Browse files
committed
feat(analytics): Add command to migrate analytics data to pg
Add command to migrate feature analytics data from influxdb to postgres
1 parent 9e3746b commit fe83bfd

File tree

3 files changed

+114
-0
lines changed

3 files changed

+114
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import argparse
2+
from typing import Any
3+
4+
from app_analytics.migrate_to_pg import migrate_feature_evaluations
5+
from django.core.management import BaseCommand
6+
7+
8+
class Command(BaseCommand):
9+
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
10+
parser.add_argument(
11+
"--migrate-till",
12+
type=int,
13+
dest="migrate_till",
14+
help="Migrate data till n days ago",
15+
default=30,
16+
)
17+
18+
def handle(self, *args: Any, migrate_till: int, **options):
19+
migrate_feature_evaluations(migrate_till)

api/app_analytics/migrate_to_pg.py

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from app_analytics.analytics_db_service import ANALYTICS_READ_BUCKET_SIZE
2+
from app_analytics.influxdb_wrapper import influxdb_client, read_bucket
3+
from app_analytics.models import FeatureEvaluationBucket
4+
5+
6+
def migrate_feature_evaluations(migrate_till: int = 30):
7+
query_api = influxdb_client.query_api()
8+
9+
for i in range(migrate_till):
10+
range_start = f"-{i+1}d"
11+
range_stop = f"-{i}d"
12+
query = f"from (bucket: {read_bucket}) |> range(start: {range_start}, stop: {range_stop})"
13+
14+
result = query_api.query(query)
15+
16+
feature_evaluations = []
17+
for table in result:
18+
for record in table.records:
19+
feature_evaluations.append(
20+
FeatureEvaluationBucket(
21+
feature_name=record.values["feature_id"],
22+
bucket_size=ANALYTICS_READ_BUCKET_SIZE,
23+
created_at=record.get_time(),
24+
total_count=record.get_value(),
25+
environment_id=record.values["environment_id"],
26+
)
27+
)
28+
FeatureEvaluationBucket.objects.bulk_create(feature_evaluations)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import pytest
2+
from app_analytics.migrate_to_pg import migrate_feature_evaluations
3+
from app_analytics.models import FeatureEvaluationBucket
4+
from django.conf import settings
5+
from django.utils import timezone
6+
from pytest_mock import MockerFixture
7+
8+
9+
@pytest.mark.skipif(
10+
"analytics" not in settings.DATABASES,
11+
reason="Skip test if analytics database is configured",
12+
)
13+
@pytest.mark.django_db(databases=["analytics", "default"])
14+
def test_migrate_feature_evaluations(mocker: MockerFixture):
15+
# Given
16+
feature_name = "test_feature_one"
17+
environment_id = "1"
18+
19+
# mock the read bucket name
20+
read_bucket = "test_bucket"
21+
mocker.patch("app_analytics.migrate_to_pg.read_bucket", read_bucket)
22+
23+
# Next, mock the influx client and create some records
24+
mock_influxdb_client = mocker.patch("app_analytics.migrate_to_pg.influxdb_client")
25+
mock_query_api = mock_influxdb_client.query_api.return_value
26+
mock_tables = []
27+
for i in range(3):
28+
mock_record = mocker.MagicMock(
29+
values={"feature_id": feature_name, "environment_id": environment_id},
30+
spec_set=["values", "get_time", "get_value"],
31+
)
32+
mock_record.get_time.return_value = timezone.now() - timezone.timedelta(days=i)
33+
mock_record.get_value.return_value = 100
34+
35+
mock_table = mocker.MagicMock(records=[mock_record], spec_set=["records"])
36+
mock_tables.append(mock_table)
37+
38+
mock_query_api.query.side_effect = [[table] for table in mock_tables]
39+
40+
# When
41+
migrate_feature_evaluations(migrate_till=3)
42+
43+
# Then - only 3 records should be created
44+
assert FeatureEvaluationBucket.objects.count() == 3
45+
assert (
46+
FeatureEvaluationBucket.objects.filter(
47+
feature_name=feature_name,
48+
environment_id=environment_id,
49+
bucket_size=15,
50+
total_count=100,
51+
).count()
52+
== 3
53+
)
54+
# And, the query should have been called 3 times
55+
mock_query_api.assert_has_calls(
56+
[
57+
mocker.call.query(
58+
f"from (bucket: {read_bucket}) |> range(start: -1d, stop: -0d)"
59+
),
60+
mocker.call.query(
61+
f"from (bucket: {read_bucket}) |> range(start: -2d, stop: -1d)"
62+
),
63+
mocker.call.query(
64+
f"from (bucket: {read_bucket}) |> range(start: -3d, stop: -2d)"
65+
),
66+
]
67+
)

0 commit comments

Comments
 (0)