Skip to content

Commit

Permalink
misc fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
gagantrivedi committed Nov 28, 2023
1 parent 31cf23f commit fc2a574
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 21 deletions.
2 changes: 1 addition & 1 deletion api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@
# Real time(server sent events) settings
SSE_SERVER_BASE_URL = env.str("SSE_SERVER_BASE_URL", None)
SSE_AUTHENTICATION_TOKEN = env.str("SSE_AUTHENTICATION_TOKEN", None)
AWS_FASTLY_LOGS_BUCKET_NAME = env.str("AWS_FASTLY_LOGS_BUCKET_NAME", None)
AWS_SSE_LOGS_BUCKET_NAME = env.str("AWS_SSE_LOGS_BUCKET_NAME", None)
SSE_INFLUXDB_BUCKET = env.str("SSE_INFLUXDB_BUCKET", None)


Expand Down
2 changes: 1 addition & 1 deletion api/app/settings/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
"user": "100000/day",
}

AWS_FASTLY_LOGS_BUCKET_NAME = "test_bucket"
AWS_SSE_LOGS_BUCKET_NAME = "test_bucket"
7 changes: 7 additions & 0 deletions api/sse/dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dataclasses import dataclass


@dataclass
class SSEAccessLogs:
generated_at: str # ISO 8601
api_key: str
4 changes: 2 additions & 2 deletions api/sse/sse_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
s3 = boto3.resource("s3")
gpg = gnupg.GPG()

bucket = s3.Bucket(settings.AWS_FASTLY_LOGS_BUCKET_NAME)
bucket = s3.Bucket(settings.AWS_SSE_LOGS_BUCKET_NAME)


def _sse_enabled(get_project_from_first_arg=lambda obj: obj.project):
Expand Down Expand Up @@ -66,4 +66,4 @@ def stream_access_logs() -> Generator[SSEAccessLogs, None, None]:
for row in reader:
yield SSEAccessLogs(*row)

# log_file.delete()
log_file.delete()
34 changes: 18 additions & 16 deletions api/sse/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,24 @@ def send_environment_update_message(environment_key: str, updated_at):
response.raise_for_status()


@register_recurring_task(
run_every=timedelta(seconds=60),
)
def update_sse_usage():
with influxdb_client.write_api(
write_options=WriteOptions(batch_size=1000, flush_interval=2000)
) as write_api:
for log in sse_service.stream_access_logs():
environment = Environment.get_from_cache(log.api_key)

if not environment:
logger.warning("Invalid api_key %s", log.api_key)
continue

record = _get_influx_point(environment, log.generated_at)
write_api.write(bucket=settings.SSE_INFLUXDB_BUCKET, record=record)
if settings.AWS_SSE_LOGS_BUCKET_NAME:

@register_recurring_task(
run_every=timedelta(seconds=60),
)
def update_sse_usage():
with influxdb_client.write_api(
write_options=WriteOptions(batch_size=1000, flush_interval=2000)
) as write_api:
for log in sse_service.stream_access_logs():
environment = Environment.get_from_cache(log.api_key)

if not environment:
logger.warning("Invalid api_key %s", log.api_key)
continue

record = _get_influx_point(environment, log.generated_at)
write_api.write(bucket=settings.SSE_INFLUXDB_BUCKET, record=record)


def _get_influx_point(environment: Environment, event_time: str) -> Point:
Expand Down
2 changes: 1 addition & 1 deletion api/tests/unit/sse/test_sse_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def test_stream_access_logs(mocker: MockerFixture, settings):
patch_resource(s3)

# Next, let's create a bucket and add some objects to it
bucket_name = settings.AWS_FASTLY_LOGS_BUCKET_NAME
bucket_name = settings.AWS_SSE_LOGS_BUCKET_NAME
s3_client = boto3.client("s3", region_name="eu-west-2")
s3_client.create_bucket(
Bucket=bucket_name,
Expand Down

0 comments on commit fc2a574

Please sign in to comment.