-
Notifications
You must be signed in to change notification settings - Fork 429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add support for replicas and cross region replicas #3300
Changes from 18 commits
637092f
65ba9d6
9977571
59329b4
088e32e
7e8b205
dd76946
0f9f357
ce75308
c941310
2953dcf
6c8e216
345cc7b
266935a
e0b1414
e4df146
644e4c7
7b8aa42
1f9ab5e
3c294fa
01a2a70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,80 @@ | ||
import logging | ||
import random | ||
from enum import Enum | ||
|
||
from django.conf import settings | ||
from django.core.cache import cache | ||
from django.db import connections | ||
|
||
logger = logging.getLogger(__name__) | ||
CONNECTION_CHECK_CACHE_TTL = 2 | ||
|
||
|
||
class ReplicaReadStrategy(Enum): | ||
DISTRIBUTED = "DISTRIBUTED" | ||
SEQUENTIAL = "SEQUENTIAL" | ||
|
||
|
||
class ImproperlyConfiguredError(RuntimeError): | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't love that this exception isn't defined in an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm happy to move it there. |
||
|
||
|
||
def connection_check(database: str) -> bool: | ||
try: | ||
conn = connections.create_connection(database) | ||
conn.connect() | ||
usable = conn.is_usable() | ||
if not usable: | ||
logger.warning( | ||
f"Unable to access database {database} during connection check" | ||
) | ||
except Exception: | ||
usable = False | ||
logger.error( | ||
"Encountered exception during connection", | ||
exc_info=True, | ||
) | ||
|
||
if usable: | ||
cache.set( | ||
f"db_connection_active.{database}", "online", CONNECTION_CHECK_CACHE_TTL | ||
) | ||
matthewelwell marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
cache.set( | ||
f"db_connection_active.{database}", "offline", CONNECTION_CHECK_CACHE_TTL | ||
) | ||
|
||
return usable | ||
|
||
|
||
class PrimaryReplicaRouter: | ||
def db_for_read(self, model, **hints): | ||
if settings.NUM_DB_REPLICAS == 0: | ||
return "default" | ||
return random.choice( | ||
[f"replica_{i}" for i in range(1, settings.NUM_DB_REPLICAS + 1)] | ||
|
||
replicas = [f"replica_{i}" for i in range(1, settings.NUM_DB_REPLICAS + 1)] | ||
replica = self._get_replica(replicas) | ||
if replica: | ||
# This return is the most likely as replicas should be | ||
# online and properly functioning. | ||
return replica | ||
|
||
# Since no replicas are available, fall back to the cross | ||
# region replicas which have worse availability. | ||
cross_region_replicas = [ | ||
f"cross_region_replica_{i}" | ||
for i in range(1, settings.NUM_CROSS_REGION_DB_REPLICAS + 1) | ||
] | ||
khvn26 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
cross_region_replica = self._get_replica(cross_region_replicas) | ||
if cross_region_replica: | ||
return cross_region_replica | ||
|
||
# No available replicas, so fallback to the default. | ||
logger.warning( | ||
"Unable to serve any available replicas, falling back to default database" | ||
) | ||
return "default" | ||
|
||
def db_for_write(self, model, **hints): | ||
return "default" | ||
|
@@ -30,6 +95,27 @@ def allow_relation(self, obj1, obj2, **hints): | |
def allow_migrate(self, db, app_label, model_name=None, **hints): | ||
return db == "default" | ||
|
||
def _get_replica(self, replicas: list[str]) -> None | str: | ||
while replicas: | ||
if settings.REPLICA_READ_STRATEGY == ReplicaReadStrategy.DISTRIBUTED.value: | ||
database = random.choice(replicas) | ||
elif settings.REPLICA_READ_STRATEGY == ReplicaReadStrategy.SEQUENTIAL.value: | ||
database = replicas[0] | ||
else: | ||
raise ImproperlyConfiguredError( | ||
f"Unknown REPLICA_READ_STRATEGY {settings.REPLICA_READ_STRATEGY}" | ||
) | ||
|
||
replicas.remove(database) | ||
db_cache = cache.get(f"db_connection_active.{database}") | ||
if db_cache == "online": | ||
return database | ||
if db_cache == "offline": | ||
continue | ||
|
||
if connection_check(database): | ||
return database | ||
|
||
|
||
class AnalyticsRouter: | ||
route_app_labels = ["app_analytics"] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
from pytest_django.fixtures import SettingsWrapper | ||
from pytest_mock import MockerFixture | ||
|
||
from app.routers import ( | ||
PrimaryReplicaRouter, | ||
ReplicaReadStrategy, | ||
connection_check, | ||
) | ||
from users.models import FFAdminUser | ||
|
||
|
||
def test_connection_check_to_default_database(db: None, reset_cache: None) -> None: | ||
# When | ||
connection_check_works = connection_check("default") | ||
|
||
# Then | ||
assert connection_check_works is True | ||
|
||
|
||
def test_replica_router_db_for_read_with_one_offline_replica( | ||
db: None, | ||
settings: SettingsWrapper, | ||
mocker: MockerFixture, | ||
reset_cache: None, | ||
) -> None: | ||
# Given | ||
settings.NUM_DB_REPLICAS = 4 | ||
|
||
# Set unused cross regional db for testing non-inclusion. | ||
settings.NUM_CROSS_REGION_DB_REPLICAS = 2 | ||
settings.REPLICA_READ_STRATEGY = ReplicaReadStrategy.DISTRIBUTED.value | ||
|
||
conn_patch = mocker.MagicMock() | ||
conn_patch.is_usable.side_effect = (False, True) | ||
create_connection_patch = mocker.patch( | ||
"app.routers.connections.create_connection", return_value=conn_patch | ||
) | ||
|
||
router = PrimaryReplicaRouter() | ||
|
||
# When | ||
result = router.db_for_read(FFAdminUser) | ||
|
||
# Then | ||
# Read strategy DISTRIBUTED is random, so just this is a check | ||
# against loading the primary or one of the cross region replicas | ||
assert result.startswith("replica_") | ||
|
||
# Check that the number of replica call counts is as expected. | ||
conn_call_count = 2 | ||
assert create_connection_patch.call_count == conn_call_count | ||
assert conn_patch.is_usable.call_count == conn_call_count | ||
|
||
|
||
def test_replica_router_db_for_read_with_local_offline_replicas( | ||
db: None, | ||
settings: SettingsWrapper, | ||
mocker: MockerFixture, | ||
reset_cache: None, | ||
) -> None: | ||
# Given | ||
settings.NUM_DB_REPLICAS = 4 | ||
|
||
# Use cross regional db for fallback after replicas. | ||
settings.NUM_CROSS_REGION_DB_REPLICAS = 2 | ||
settings.REPLICA_READ_STRATEGY = ReplicaReadStrategy.DISTRIBUTED.value | ||
|
||
conn_patch = mocker.MagicMock() | ||
|
||
# All four replicas go offline and so does one of the cross | ||
# regional replica as well, before finally the last cross | ||
# region replica is finally connected to. | ||
conn_patch.is_usable.side_effect = ( | ||
False, | ||
False, | ||
False, | ||
False, | ||
False, | ||
True, | ||
) | ||
create_connection_patch = mocker.patch( | ||
"app.routers.connections.create_connection", return_value=conn_patch | ||
) | ||
|
||
router = PrimaryReplicaRouter() | ||
|
||
# When | ||
result = router.db_for_read(FFAdminUser) | ||
|
||
# Then | ||
# Read strategy DISTRIBUTED is random, so just this is a check | ||
# against loading the primary or one of the cross region replicas | ||
assert result.startswith("cross_region_replica_") | ||
|
||
# Check that the number of replica call counts is as expected. | ||
conn_call_count = 6 | ||
assert create_connection_patch.call_count == conn_call_count | ||
assert conn_patch.is_usable.call_count == conn_call_count | ||
|
||
|
||
def test_replica_router_db_for_read_with_all_offline_replicas( | ||
db: None, | ||
settings: SettingsWrapper, | ||
mocker: MockerFixture, | ||
reset_cache: None, | ||
) -> None: | ||
# Given | ||
settings.NUM_DB_REPLICAS = 4 | ||
settings.NUM_CROSS_REGION_DB_REPLICAS = 2 | ||
settings.REPLICA_READ_STRATEGY = ReplicaReadStrategy.DISTRIBUTED.value | ||
|
||
conn_patch = mocker.MagicMock() | ||
|
||
# All replicas go offline. | ||
conn_patch.is_usable.return_value = False | ||
create_connection_patch = mocker.patch( | ||
"app.routers.connections.create_connection", return_value=conn_patch | ||
) | ||
|
||
router = PrimaryReplicaRouter() | ||
|
||
# When | ||
result = router.db_for_read(FFAdminUser) | ||
|
||
# Then | ||
# Fallback to primary database if all replicas are offline. | ||
assert result == "default" | ||
|
||
# Check that the number of replica call counts is as expected. | ||
conn_call_count = 6 | ||
assert create_connection_patch.call_count == conn_call_count | ||
assert conn_patch.is_usable.call_count == conn_call_count | ||
|
||
|
||
def test_replica_router_db_with_sequential_read( | ||
db: None, | ||
settings: SettingsWrapper, | ||
mocker: MockerFixture, | ||
reset_cache: None, | ||
) -> None: | ||
# Given | ||
settings.NUM_DB_REPLICAS = 100 | ||
settings.NUM_CROSS_REGION_DB_REPLICAS = 2 | ||
settings.REPLICA_READ_STRATEGY = ReplicaReadStrategy.SEQUENTIAL.value | ||
|
||
conn_patch = mocker.MagicMock() | ||
|
||
# First replica is offline, so must fall back to second one. | ||
conn_patch.is_usable.side_effect = (False, True) | ||
create_connection_patch = mocker.patch( | ||
"app.routers.connections.create_connection", return_value=conn_patch | ||
) | ||
|
||
router = PrimaryReplicaRouter() | ||
|
||
# When | ||
result = router.db_for_read(FFAdminUser) | ||
|
||
# Then | ||
# Fallback from first replica to second one. | ||
assert result == "replica_2" | ||
|
||
# Check that the number of replica call counts is as expected. | ||
conn_call_count = 2 | ||
assert create_connection_patch.call_count == conn_call_count | ||
assert conn_patch.is_usable.call_count == conn_call_count | ||
|
||
|
||
def test_replica_router_db_no_replicas( | ||
db: None, | ||
settings: SettingsWrapper, | ||
mocker: MockerFixture, | ||
reset_cache: None, | ||
) -> None: | ||
# Given | ||
settings.NUM_DB_REPLICAS = 0 | ||
settings.NUM_CROSS_REGION_DB_REPLICAS = 0 | ||
|
||
conn_patch = mocker.MagicMock() | ||
|
||
# All replicas should be ignored. | ||
create_connection_patch = mocker.patch( | ||
"app.routers.connections.create_connection", return_value=conn_patch | ||
) | ||
|
||
router = PrimaryReplicaRouter() | ||
|
||
# When | ||
result = router.db_for_read(FFAdminUser) | ||
|
||
# Then | ||
# Should always use primary database. | ||
assert result == "default" | ||
conn_call_count = 0 | ||
assert create_connection_patch.call_count == conn_call_count | ||
assert conn_patch.is_usable.call_count == conn_call_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: imo these should be separated by a (blank) new line whitespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it looks fine my way, but I don't mind doing it your way either so I've updated it.