Skip to content

Commit

Permalink
feat: Switch existing task processor health checks to new liveness pr…
Browse files Browse the repository at this point in the history
…obe (#5161)

Co-authored-by: Rodrigo López Dato <[email protected]>
  • Loading branch information
khvn26 and rolodato authored Mar 5, 2025
1 parent 9460dc7 commit 647712c
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 107 deletions.
11 changes: 7 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
# Build a SaaS API image:
# $ GH_TOKEN=$(gh auth token) docker build -t flagsmith-saas-api:dev --target saas-api \
# --secret="id=sse_pgp_pkey,src=./sse_pgp_pkey.key"\
# --secret="id=github_private_cloud_token,env=GH_TOKEN" .
# --secret="id=github_private_cloud_token,env=GH_TOKEN" .

# Build a Private Cloud Unified image:
# $ GH_TOKEN=$(gh auth token) docker build -t flagsmith-private-cloud:dev --target private-cloud-unified \
# --secret="id=github_private_cloud_token,env=GH_TOKEN" .
# --secret="id=github_private_cloud_token,env=GH_TOKEN" .

# Table of Contents
# Stages are described as stage-name [dependencies]
Expand Down Expand Up @@ -90,7 +90,7 @@ ARG PYTHON_VERSION
RUN apk add build-base linux-headers curl git \
python-${PYTHON_VERSION} \
python-${PYTHON_VERSION}-dev \
py${PYTHON_VERSION}-pip
py${PYTHON_VERSION}-pip

COPY api/pyproject.toml api/poetry.lock api/Makefile ./
ENV POETRY_VIRTUALENVS_IN_PROJECT=true \
Expand Down Expand Up @@ -119,7 +119,7 @@ FROM wolfi-base AS api-runtime

# Install Python and make it available to venv entrypoints
ARG PYTHON_VERSION
RUN apk add python-${PYTHON_VERSION} && \
RUN apk add curl python-${PYTHON_VERSION} && \
mkdir /build/ && ln -s /usr/local/ /build/.venv

WORKDIR /app
Expand All @@ -139,6 +139,9 @@ ENTRYPOINT ["/app/scripts/run-docker.sh"]

CMD ["migrate-and-serve"]

HEALTHCHECK --interval=2s --timeout=2s --retries=3 --start-period=20s \
CMD curl -f http://localhost:8000/health/liveness || exit 1

# * api-runtime-private [api-runtime]
FROM api-runtime AS api-runtime-private

Expand Down
5 changes: 4 additions & 1 deletion api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,14 +986,17 @@
# Used to control the size(number of identities) of the project that can be self migrated to edge
MAX_SELF_MIGRATABLE_IDENTITIES = env.int("MAX_SELF_MIGRATABLE_IDENTITIES", 100000)

# RUN_BY_PROCESSOR is set by the task processor entrypoint
TASK_PROCESSOR_MODE = env.bool("RUN_BY_PROCESSOR", False)

# Setting to allow asynchronous tasks to be run synchronously for testing purposes
# or in a separate thread for self-hosted users
TASK_RUN_METHOD = env.enum(
"TASK_RUN_METHOD",
type=TaskRunMethod,
default=(
TaskRunMethod.TASK_PROCESSOR.value
if env.bool("RUN_BY_PROCESSOR", False)
if TASK_PROCESSOR_MODE
else TaskRunMethod.SEPARATE_THREAD.value
),
)
Expand Down
103 changes: 54 additions & 49 deletions api/app/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,67 +10,72 @@
from . import views

urlpatterns = [
re_path(r"^api/v1/", include("api.urls.deprecated", namespace="api-deprecated")),
re_path(r"^api/v1/", include("api.urls.v1", namespace="api-v1")),
re_path(r"^api/v2/", include("api.urls.v2", namespace="api-v2")),
re_path(r"^admin/", admin.site.urls),
re_path(r"^health/liveness/?", views.version_info),
re_path(r"^health/readiness/?", include("health_check.urls")),
re_path(r"^health", include("health_check.urls", namespace="health")),
# Aptible health checks must be on /healthcheck and cannot redirect
# see https://www.aptible.com/docs/core-concepts/apps/connecting-to-apps/app-endpoints/https-endpoints/health-checks
path("healthcheck", include("health_check.urls", namespace="aptible")),
re_path(r"^version", views.version_info, name="version-info"),
re_path(
r"^sales-dashboard/",
include("sales_dashboard.urls", namespace="sales_dashboard"),
),
# this url is used to generate email content for the password reset workflow
re_path(
r"^password-reset/confirm/(?P<uidb64>[0-9A-Za-z_\-]+)/(?P<token>[0-9A-Za-z]{1,"
r"13}-[0-9A-Za-z]{1,20})/$",
password_reset_redirect,
name="password_reset_confirm",
),
re_path(
r"^config/project-overrides",
views.project_overrides,
name="project_overrides",
),
path("processor/", include("task_processor.urls")),
path(
"robots.txt",
TemplateView.as_view(template_name="robots.txt", content_type="text/plain"),
),
]

if settings.DEBUG:
import debug_toolbar # type: ignore[import-untyped,unused-ignore]
if not settings.TASK_PROCESSOR_MODE:
urlpatterns += [
re_path(
r"^api/v1/", include("api.urls.deprecated", namespace="api-deprecated")
),
re_path(r"^api/v1/", include("api.urls.v1", namespace="api-v1")),
re_path(r"^api/v2/", include("api.urls.v2", namespace="api-v2")),
re_path(r"^admin/", admin.site.urls),
re_path(r"^health", include("health_check.urls", namespace="health")),
# Aptible health checks must be on /healthcheck and cannot redirect
# see https://www.aptible.com/docs/core-concepts/apps/connecting-to-apps/app-endpoints/https-endpoints/health-checks
path("healthcheck", include("health_check.urls", namespace="aptible")),
re_path(r"^version", views.version_info, name="version-info"),
re_path(
r"^sales-dashboard/",
include("sales_dashboard.urls", namespace="sales_dashboard"),
),
# this url is used to generate email content for the password reset workflow
re_path(
r"^password-reset/confirm/(?P<uidb64>[0-9A-Za-z_\-]+)/(?P<token>[0-9A-Za-z]{1,"
r"13}-[0-9A-Za-z]{1,20})/$",
password_reset_redirect,
name="password_reset_confirm",
),
re_path(
r"^config/project-overrides",
views.project_overrides,
name="project_overrides",
),
path(
"robots.txt",
TemplateView.as_view(template_name="robots.txt", content_type="text/plain"),
),
]

if settings.DEBUG:
urlpatterns = [
re_path(r"^__debug__/", include(debug_toolbar.urls)), # type: ignore[attr-defined,unused-ignore]
re_path(r"^__debug__/", include("debug_toolbar.urls")),
] + urlpatterns

if settings.SAML_INSTALLED:
urlpatterns.append(path("api/v1/auth/saml/", include("saml.urls")))
if settings.SAML_INSTALLED: # pragma: no cover
urlpatterns += [
path("api/v1/auth/saml/", include("saml.urls")),
]

if settings.WORKFLOWS_LOGIC_INSTALLED: # pragma: no cover
workflow_views = importlib.import_module("workflows_logic.views")
urlpatterns.extend(
[
path("api/v1/features/workflows/", include("workflows_logic.urls")),
path(
"api/v1/environments/<str:environment_api_key>/create-change-request/",
workflow_views.create_change_request,
name="create-change-request",
),
path(
"api/v1/environments/<str:environment_api_key>/list-change-requests/",
workflow_views.list_change_requests,
name="list-change-requests",
),
]
)
urlpatterns += [
path("api/v1/features/workflows/", include("workflows_logic.urls")),
path(
"api/v1/environments/<str:environment_api_key>/create-change-request/",
workflow_views.create_change_request,
name="create-change-request",
),
path(
"api/v1/environments/<str:environment_api_key>/list-change-requests/",
workflow_views.list_change_requests,
name="list-change-requests",
),
]


if settings.SERVE_FE_ASSETS: # pragma: no cover
# add route to serve FE assets for any unrecognised paths
Expand Down
7 changes: 7 additions & 0 deletions api/manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
import sys

if __name__ == "__main__":
# Backwards compatibility for task-processor health checks
# See https://github.com/Flagsmith/flagsmith-task-processor/issues/24
if "checktaskprocessorthreadhealth" in sys.argv:
import scripts.healthcheck

scripts.healthcheck.main()

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "app.settings.local")

try:
Expand Down
7 changes: 4 additions & 3 deletions api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ exclude_also = [
]

[tool.coverage.run]
omit = ["scripts/*"]
omit = ["scripts/*", "manage.py"]

[tool.pytest.ini_options]
addopts = ['--ds=app.settings.test', '-vvvv', '-p', 'no:warnings']
Expand Down Expand Up @@ -146,7 +146,7 @@ pygithub = "2.1.1"
hubspot-api-client = "^8.2.1"
djangorestframework-dataclasses = "^1.3.1"
pyotp = "^2.9.0"
flagsmith-task-processor = { git = "https://github.com/Flagsmith/flagsmith-task-processor", tag = "v1.2.2" }
flagsmith-task-processor = { git = "https://github.com/Flagsmith/flagsmith-task-processor", tag = "v1.3.1" }
flagsmith-common = { git = "https://github.com/Flagsmith/flagsmith-common", tag = "v1.4.2" }
tzdata = "^2024.1"
djangorestframework-simplejwt = "^5.3.1"
Expand Down
Empty file added api/scripts/__init__.py
Empty file.
24 changes: 21 additions & 3 deletions api/scripts/healthcheck.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
import logging
import sys

import requests

url = "http://localhost:8000/health/liveness"
status = requests.get(url).status_code
HEALTH_LIVENESS_URL = "http://localhost:8000/health/liveness"

sys.exit(0 if 200 >= status < 300 else 1)

logger = logging.getLogger(__name__)


def main() -> None:
logger.warning(
f"This healthcheck, invoked by {' '.join(sys.argv)}, is deprecated. "
f"Use the `{HEALTH_LIVENESS_URL}` endpoint instead."
)
status_code = requests.get(HEALTH_LIVENESS_URL).status_code

if status_code != 200:
logger.error(f"Health check failed with status {status_code}")

sys.exit(0 if 200 >= status_code < 300 else 1)


if __name__ == "__main__":
main()
23 changes: 16 additions & 7 deletions api/scripts/run-docker.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#!/bin/sh
set -e

# common environment variables
ACCESS_LOG_FORMAT=${ACCESS_LOG_FORMAT:-'%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %({origin}i)s %({access-control-allow-origin}o)s'}
GUNICORN_LOGGER_CLASS=${GUNICORN_LOGGER_CLASS:-'util.logging.GunicornJsonCapableLogger'}

waitfordb() {
if [ -z "${SKIP_WAIT_FOR_DB}" ]; then
python manage.py waitfordb "$@"
Expand All @@ -27,8 +31,8 @@ serve() {
--workers ${GUNICORN_WORKERS:-3} \
--threads ${GUNICORN_THREADS:-2} \
--access-logfile $ACCESS_LOG_LOCATION \
--logger-class ${GUNICORN_LOGGER_CLASS:-'util.logging.GunicornJsonCapableLogger'} \
--access-logformat ${ACCESS_LOG_FORMAT:-'%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %({origin}i)s %({access-control-allow-origin}o)s'} \
--logger-class $GUNICORN_LOGGER_CLASS \
--access-logformat "$ACCESS_LOG_FORMAT" \
--keep-alive ${GUNICORN_KEEP_ALIVE:-2} \
${STATSD_HOST:+--statsd-host $STATSD_HOST:$STATSD_PORT} \
${STATSD_HOST:+--statsd-prefix $STATSD_PREFIX} \
Expand All @@ -39,11 +43,16 @@ run_task_processor() {
if [ -n "$ANALYTICS_DATABASE_URL" ] || [ -n "$DJANGO_DB_NAME_ANALYTICS" ]; then
waitfordb --waitfor 30 --migrations --database analytics
fi
RUN_BY_PROCESSOR=1 exec python manage.py runprocessor \
--sleepintervalms ${TASK_PROCESSOR_SLEEP_INTERVAL:-500} \
--graceperiodms ${TASK_PROCESSOR_GRACE_PERIOD_MS:-20000} \
--numthreads ${TASK_PROCESSOR_NUM_THREADS:-5} \
--queuepopsize ${TASK_PROCESSOR_QUEUE_POP_SIZE:-10}
RUN_BY_PROCESSOR=1 python manage.py runprocessor \
--sleepintervalms ${TASK_PROCESSOR_SLEEP_INTERVAL_MS:-${TASK_PROCESSOR_SLEEP_INTERVAL:-500}} \
--graceperiodms ${TASK_PROCESSOR_GRACE_PERIOD_MS:-20000} \
--numthreads ${TASK_PROCESSOR_NUM_THREADS:-5} \
--queuepopsize ${TASK_PROCESSOR_QUEUE_POP_SIZE:-10} \
gunicorn \
--bind 0.0.0.0:8000 \
--access-logfile $ACCESS_LOG_LOCATION \
--logger-class $GUNICORN_LOGGER_CLASS \
--access-logformat "$ACCESS_LOG_FORMAT"
}
migrate_analytics_db(){
# if `$ANALYTICS_DATABASE_URL` or DJANGO_DB_NAME_ANALYTICS is set
Expand Down
6 changes: 0 additions & 6 deletions docker-compose.pgpool.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ services:
TASK_RUN_METHOD: TASK_PROCESSOR # other options are: SYNCHRONOUSLY, SEPARATE_THREAD (default)
ports:
- 8000:8000
healthcheck:
test: ['CMD-SHELL', 'python /app/scripts/healthcheck.py']
interval: 2s
timeout: 2s
retries: 20
start_period: 20s
depends_on:
pgpool:
condition: service_healthy
Expand Down
6 changes: 0 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@ services:
# EMAIL_USE_TLS: 'true' # optional
ports:
- 8000:8000
healthcheck:
test: ['CMD-SHELL', 'python /app/scripts/healthcheck.py']
interval: 2s
timeout: 2s
retries: 20
start_period: 20s
depends_on:
postgres:
condition: service_healthy
Expand Down
41 changes: 15 additions & 26 deletions docs/docs/deployment/configuration/task-processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,44 +47,33 @@ flagsmith_processor:
## Configuring the Processor
The processor exposes a number of configuration options to tune the processor to your needs / setup. These configuration
options are via command line arguments when starting the processor.
options are via command line arguments when starting the processor. The default Flagsmith container entrypoint expects
these options as `TASK_PROCESSOR_`-prefixed environment variables.

| Argument | Description | Default |
| ------------------- | ------------------------------------------------------------------------- | ------- |
| `--sleepintervalms` | The amount of ms each worker should sleep between checking for a new task | 2000 |
| `--numthreads` | The number of worker threads to run per task processor instance | 5 |
| `--graceperiodms` | The amount of ms before a worker thread is considered 'stuck'. | 20000 |
| Environment variable | Argument | Description | Default |
| ---------------------------------- | ------------------- | -------------------------------------------------------------------------- | ------- |
| `TASK_PROCESSOR_SLEEP_INTERVAL_MS` | `--sleepintervalms` | The amount of ms each worker should sleep between checking for a new task. | 500 |
| `TASK_PROCESSOR_NUM_THREADS` | `--numthreads` | The number of worker threads to run per task processor instance. | 5 |
| `TASK_PROCESSOR_GRACE_PERIOD_MS` | `--graceperiodms` | The amount of ms before a worker thread is considered 'stuck'. | 20000 |
| `TASK_PROCESSOR_QUEUE_POP_SIZE` | `--queuepopsize` | The number of enqueued tasks to retrieve for processing for one iteration. | 10 |

## Monitoring

There are a number of options for monitoring the task processor's health.

### Checking Thread / Worker Health
### Health checks

The task processor includes a management command which checks the health of the worker threads which are running tasks.

```
python manage.py checktaskprocessorthreadhealth
```
The command will exit with either a success exit code (0) or a failure exit code (1).
### API to Task Processor health
To monitor that the API can send tasks to the processor and that they are successfully run, there is a custom health
check which can be enabled on the general health endpoint (`GET /health?format=json`). This health check needs to be
enabled manually, which can be done by setting the `ENABLE_TASK_PROCESSOR_HEALTH_CHECK` environment variable to `True`
(in the flagsmith application container, not the task processor). Note that this health check is not considered
"critical" and hence, the endpoint will return a 200 OK regardless of whether the task processor is sucessfully
processing tasks or not.
A task processor container exposes `/health/readiness` and `/health/liveness` endpoints for readiness and liveness
probes. The endpoints run simple availability checks. To include a test that enqueues a task and makes sure it's run
to your readiness probe, set `ENABLE_TASK_PROCESSOR_HEALTH_CHECK` environment variable to `True`.

### Task statistics

Within the API, there is an endpoint which returns, in JSON format, statistics about the tasks consumed / to be consumed
by the processor. This endpoint is available at `GET /processor/monitoring`. This will respond with the following data:
Both API and Task processor expose an endpoint which returns Task processor statistics in JSON format.
This endpoint is available at `GET /processor/monitoring`. See an example response below:

```json
{
"waiting": 1
"waiting": 1 // The number of tasks waiting in the queue.
}
```

0 comments on commit 647712c

Please sign in to comment.