-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathread-log-fluentbit.py
65 lines (51 loc) · 1.99 KB
/
read-log-fluentbit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import csv
from joblib import Parallel, delayed
from fluent import sender
from os import listdir
import json
def send_logs(logger, directory, filename):
print(f"exporting {filename}")
with open(f"{directory}/{filename}", "r") as file:
lines = file.readlines()
for line in lines:
if line == "":
pass
if line == " [Your log message was truncated]":
pass
row = list(csv.reader([line]))[0]
if len(row) > 1:
try:
log = {}
log["timestamp"] = int(row[0])
log["serverhost"] = row[1]
log["username"] = row[2]
log["host"] = row[3]
log["connectionid"] = int(row[4])
log["queryid"] = int(row[5])
log["operation"] = row[6]
log["database"] = row[7]
log["object"] = row[8]
if log["object"].lower() == "'set autocommit=0'":
log["retcode"] = 0
else:
try:
log["retcode"] = int(row[9])
except ValueError as e:
log["retcode"] = -1
except Exception as e:
log["retcode"] = -1
print(f"{e} : {line}")
logger.emit("rds", log)
except Exception as e:
print(f"{e} : {line}")
if __name__ == "__main__":
logger = sender.FluentSender(
"rds", host="0.0.0.0", port=24224, nanosecond_precision=True
)
path = "./.raw_data/ffi-krdv-l2alpha-amy-prd-node-mstr/"
file_list = listdir(path)
# dir_list = ["audit.log.0.2024-03-18-13-21.2"]
Parallel(n_jobs=8, prefer="threads")(
delayed(send_logs)(logger=logger, directory=path, filename=file) for file in file_list
)
logger.close()