File: //proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/snapshots/reader.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import datetime
import json
import sys
import lvestats.lib.commons.decorators
import prettytable
from lvestats.lib import dbengine, uidconverter
from lvestats.lib.commons import dateutil
from lvestats.lib.jsonhandler import prepare_data_json
from lvestats.lib.parsers.lve_read_snapshot_argparse import lve_read_snapshot_parser
from lvestats.lib.snapshot import Snapshot
from lvestats.orm.incident import incident
from sqlalchemy import or_
from sqlalchemy.orm import sessionmaker
REPORT_HEADER = "Snapshots collected starting from %s to %s for lve id %d @ %s:\n"
REPORT_FOOTER = "Done..\n"
DEFAULT_SERVER_ID = "localhost" # used when nothing is configured or specified in cmdline
def _calculate_period(opts):
if opts.period:
start, end = opts.period
elif opts.timestamp:
start = opts.timestamp
end = start + 0.999999
else:
try:
start = dateutil.parse_date(" ".join(opts.ffrom))
end = dateutil.parse_date(" ".join(opts.to))
except ValueError:
print("please use [YY]YY-MM-DD[ HH:MM] format for --from and --to")
return None, None
return start, end
@lvestats.lib.commons.decorators.no_sigpipe
def snapshot_reader_main(config, argv_=None):
parser = lve_read_snapshot_parser()
opts = parser.parse_args(argv_)
if not opts.id and not opts.user:
parser.print_help()
print("One of -u --user or -i --id should be specified")
return 1
if opts.snap_sql_item is not None and (not opts.timestamp or not opts.json):
print("--snap-sql-item can only be used with --timestamp and --json options")
return 1
try:
engine = dbengine.make_db_engine(config)
except dbengine.MakeDbException as e:
print(e)
return 1
server_id = config.get("server_id", DEFAULT_SERVER_ID)
if opts.user:
uid = uidconverter.username_to_uid(opts.user, server_id, server_id, engine)
if uid is None:
print(f"User {opts.user}@{server_id} not found")
return 1
else:
uid = opts.id
start, end = _calculate_period(opts)
if start is None and end is None:
return 1
lve_read_snapshot = LVEReadSnaphot(
engine,
start,
end,
uid,
server_id,
opts.output,
opts.json,
opts.compact,
opts.snap_sql_item,
)
if opts.list:
lve_read_snapshot.list() # show snapshots timestamps list
elif opts.stats:
lve_read_snapshot.stats(opts.unit)
else:
lve_read_snapshot.run()
def _try_convert_to_timestamp(o):
"""
Convert local datetime to unix timestamp, or just passes
unix timestamp as output if specified.
:param o:
:return:
"""
if isinstance(o, datetime.datetime):
return dateutil.gm_datetime_to_unixtimestamp(dateutil.local_to_gm(o))
return o
class LVEReadSnaphot:
def __init__(
self,
engine,
start,
end,
uid,
server_id,
output_file,
do_json,
compact_mode=False,
snap_sql_item=None,
):
"""
:param start: datetime.datetime | int (unix timestamp)
:param end: datetime.datetime | int (unix timestamp)
:param uid:
:param server_id:
:param output_file: filename
:param do_json: boolean
:param compact_mode: boolean - truncate SQL queries for memory efficiency
:param snap_sql_item: int - index of specific SQL query to return (only with --timestamp)
"""
self.engine = engine
self.do_json = do_json
self.output_file = output_file
self.uid = uid
self.start = _try_convert_to_timestamp(start)
self.end = _try_convert_to_timestamp(end)
self.server_id = server_id
self.compact_mode = compact_mode
self.snap_sql_item = snap_sql_item
# Faults names dictionary
self.fault_names = {
"cpu_fault": "CPU",
"mem_fault": "Virtual memory",
"mep_fault": "EP",
"memphy_fault": "Physical memory",
"nproc_fault": "NPROC",
"io_fault": "IO",
"iops_fault": "IOPS",
}
def truncate_sql_queries(self, snap_sql, max_length=200):
"""
Truncate SQL queries for compact mode to reduce memory usage.
:param snap_sql: List of SQL query arrays [cmd, time, sql_query]
:param max_length: Maximum length for SQL query strings
:return: List of truncated SQL query arrays
"""
if not snap_sql:
return snap_sql
truncated = []
for query_entry in snap_sql:
# has SQL query at index 2
if len(query_entry) >= 3:
cmd, time, sql = query_entry[0], query_entry[1], query_entry[2]
if len(sql) > max_length:
sql = sql[:max_length] + "..."
truncated.append([cmd, time, sql])
else:
# preserve as-is if unexpected format
truncated.append(query_entry)
return truncated
def _handle_sql_item_request(self, snapshots_generator, out):
"""
Handle --snap-sql-item request: return specific SQL query by index.
:param snapshots_generator: Generator of snapshot data
:param out: Output stream
:return: None (writes JSON directly to output)
"""
snapshots_list = list(snapshots_generator)
if not snapshots_list:
out.write('{"success": 0, "error": "No snapshot found for the specified timestamp"}\n')
out.flush()
return
if len(snapshots_list) > 1:
out.write('{"success": 0, "error": "--snap-sql-item only works with --timestamp for single snapshots"}\n')
out.flush()
return
snapshot_data = snapshots_list[0]
snap_sql = snapshot_data.get("snap_sql", [])
if self.snap_sql_item < 0 or self.snap_sql_item >= len(snap_sql):
error_msg = f"SQL item index {self.snap_sql_item} out of range. Available indices: 0-{len(snap_sql) - 1}"
out.write(f'{{"success": 0, "error": "{error_msg}"}}\n')
out.flush()
return
sql_item = snap_sql[self.snap_sql_item]
result = {"success": 1, "data": sql_item}
out.write(json.dumps(result, indent=2))
out.write("\n")
out.flush()
def get_incidents(self, session):
return (
session.query(incident)
.filter(
incident.uid == self.uid,
incident.server_id == self.server_id,
or_(
incident.incident_start_time.between(self.start, self.end),
incident.incident_end_time.between(self.start, self.end),
),
)
.order_by(incident.incident_start_time)
.all()
)
def stats_by_incident(self, session):
result = []
for i in self.get_incidents(session):
result.append(
{
"from": i.incident_start_time,
"to": max(i.dump_time, i.incident_end_time or 0),
"incidents": 1,
"snapshots": i.snapshot_count,
"duration": self.get_duration(i),
}
)
return result
@staticmethod
def get_duration(i, from_=0, to_=sys.maxsize):
from_ = max(i.incident_start_time, from_)
to_ = min(max(i.dump_time, i.incident_end_time or 0), to_)
return to_ - from_
@staticmethod
def get_incident_count(incidents, pos, from_ts, to_ts):
count = 0
duration = 0
while pos < len(incidents):
i = incidents[pos]
if i.dump_time < from_ts:
pos += 1
continue
if i.incident_start_time > to_ts:
break # we are done
count += 1
pos += 1
duration += LVEReadSnaphot.get_duration(i, from_ts, to_ts)
if count == 0:
return 0, 0, pos
else:
return count, duration, pos - 1
def stats_by_time_unit(self, session, time_unit):
incidents = self.get_incidents(session)
snapshot_files = Snapshot({"uid": self.uid}).get_file_list()
result = []
from_ts = self.start
pos = 0
while from_ts < self.end:
to_ts = min(from_ts + time_unit, self.end)
incident_count, duration, pos = self.get_incident_count(incidents, pos, from_ts, to_ts)
if incident_count == 0: # skip this one, we have nothing here
from_ts = to_ts
continue
snapshots = Snapshot.snapshot_filter(snapshot_files, from_ts, to_ts)
if len(snapshots) == 0:
snapshots.append(0) # always show like there is at least one snapshot
result.append(
{
"from": from_ts,
"to": to_ts,
"incidents": incident_count,
"snapshots": len(snapshots),
"duration": duration,
}
)
from_ts = to_ts
return result
def print_stats_json(self, stats):
data = {"from": self.start, "to": self.end, "stats": stats}
out = self.open()
out.write(prepare_data_json(data))
out.write("\n")
out.flush()
def print_stats(self, stats):
out = self.open()
out.write(f"Stats from {dateutil.ts_to_iso(self.start)} to {dateutil.ts_to_iso(self.end)}\n")
for stat in stats:
out.write("---\n")
out.write(f"\tfrom: {dateutil.ts_to_iso(stat['from'])}\n")
out.write(f"\tto: {dateutil.ts_to_iso(stat['to'])}\n")
out.write(f"\tincidents: {stat['incidents']}\n")
out.write(f"\tsnapshots: {stat['snapshots']}\n")
out.write(f"\tduration: {stat['duration']} sec.\n")
out.flush()
def stats(self, stats_unit_str):
try:
time_unit = dateutil.parse_period2(stats_unit_str)
if self.end - self.start < time_unit:
# this prevents situations when we get stats for last 10 minutes, but group it by 1 day
self.start = self.end - time_unit
group_by_incident = False
except ValueError:
time_unit = None
group_by_incident = stats_unit_str == "auto"
if not group_by_incident:
raise
session = sessionmaker(bind=self.engine)()
try:
if group_by_incident:
stats = self.stats_by_incident(session)
else:
stats = self.stats_by_time_unit(session, time_unit)
session.expunge_all()
if self.do_json:
self.print_stats_json(stats)
else:
self.print_stats(stats)
finally:
session.close()
def run(self):
snapshots_obj = Snapshot({"uid": self.uid})
self.report(snapshots_obj.get_snapshots(self.start, self.end))
def list(self):
snapshots = Snapshot({"uid": self.uid})
snapshots_list = snapshots.get_ts_list(self.start, self.end)
out = self.open()
if self.do_json:
out.write(prepare_data_json(snapshots_list))
else:
out.write(
f"Snapshots timestamp list; from {dateutil.ts_to_iso(self.start)} "
f"to {dateutil.ts_to_iso(self.end)} for lve id {self.uid}\n"
)
for ts in snapshots_list:
out.write(dateutil.ts_to_iso(ts))
out.write("\n")
out.write(REPORT_FOOTER)
out.flush()
def report(self, snapshots_generator):
out = self.open()
if self.do_json:
# Handle --snap-sql-item request (single SQL query by index)
if self.snap_sql_item is not None:
self._handle_sql_item_request(snapshots_generator, out)
return
# Normal snapshot list output
out.write('{"success": 1, "data": {"snapshots": [')
first_snapshot = True
for snapshot_data in snapshots_generator:
# Apply SQL truncation in compact mode
if self.compact_mode and "snap_sql" in snapshot_data:
snapshot_data["snap_sql"] = self.truncate_sql_queries(snapshot_data["snap_sql"])
if not first_snapshot:
out.write(", ")
else:
first_snapshot = False
out.write(json.dumps(snapshot_data))
out.write("]}}")
out.write("\n")
out.flush()
return
out.write(
REPORT_HEADER % (dateutil.ts_to_iso(self.start), dateutil.ts_to_iso(self.end), self.uid, self.server_id)
)
snapshot_count = 0
for snapshot_data in snapshots_generator:
# Apply SQL truncation in compact mode
if self.compact_mode and "snap_sql" in snapshot_data:
snapshot_data["snap_sql"] = self.truncate_sql_queries(snapshot_data["snap_sql"])
self.format_snapshot(out, snapshot_data)
snapshot_count += 1
if snapshot_count == 0:
out.write("No snapshots found for the specified time period.\n")
out.write(REPORT_FOOTER)
out.flush()
def open(self):
if self.output_file:
try:
return open(self.output_file, "w", encoding="utf-8")
except IOError:
pass # maybe need error message
# fixme --> if we are trying to write to a file, and cannot,
# this is an error, we shouldn't write to stdout
return sys.stdout
@staticmethod
def _process_data_aggregate(process_data):
"""
Aggregate process data by PID by summing CPU % and MEM for same PIDs
:param process_data: input data. Dictionary:
{
u'151048': {u'MEM': u'1', u'CMD': u'bash', u'PID': u'151048', u'CPU': u'0%'},
u'151047': {u'MEM': u'1', u'CMD': u'su cltest1', u'PID': u'151047', u'CPU': u'0%'},
u'153642': {u'MEM': u'1', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'0%'},
u'153641': {u'MEM': u'1', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'0%'},
u'153640': {u'MEM': u'1', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'5%'}
}
:return: Output data - List of dictionaries:
[
{u'MEM': u'1', u'CMD': u'bash', u'PID': u'151048', u'CPU': u'0%'},
{u'MEM': u'1', u'CMD': u'su cltest1', u'PID': u'151047', u'CPU': u'0%'},
{u'MEM': u'3', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'5%'},
]
"""
# 1. Build thread dictionary as
# pid: {'PID', 'CMD', 'MEM', 'CPU'}
# and aggregate data
thread_dict = {}
for _, proc_data in process_data.items():
if "PID" not in proc_data:
# old format snapshot, do not aggregate it
# Example of old format snapshot:
# {u'31228': {u'MEM': u'1', u'CMD': u'31228', u'IOPS': u'N/A', u'CPU': u'1%', u'IO': u'N/A'}}
pid = proc_data["CMD"]
process_data_new = {}
process_data_new["PID"] = pid
process_data_new["MEM"] = proc_data["MEM"]
process_data_new["CMD"] = pid
process_data_new["CPU"] = proc_data["CPU"]
thread_dict[pid] = process_data_new
continue
pid = proc_data["PID"]
# remove '%' from CPU value and convert CPU/MEM to integers
if proc_data["CPU"] != "N/A":
proc_data["CPU"] = int(proc_data["CPU"].replace("%", ""))
if proc_data["MEM"] != "N/A":
proc_data["MEM"] = int(proc_data["MEM"])
if pid in thread_dict:
# PID already present, add new data to it
if proc_data["CPU"] != "N/A":
if thread_dict[pid]["CPU"] != "N/A":
thread_dict[pid]["CPU"] += proc_data["CPU"]
else:
thread_dict[pid]["CPU"] = proc_data["CPU"]
if proc_data["MEM"] != "N/A":
if thread_dict[pid]["MEM"] != "N/A":
thread_dict[pid]["MEM"] += proc_data["MEM"]
else:
thread_dict[pid]["MEM"] = proc_data["MEM"]
else:
# PID absent, add it
thread_dict[pid] = proc_data
# 2. Build output list
out_data = list(thread_dict.values())
return out_data
def format_snapshot(self, out, snapshot_data):
out.write(f">>> {dateutil.ts_to_iso(snapshot_data['dump_time'])}, UID {snapshot_data['uid']}\n")
out.write("\nFaults:\n")
for k, v in snapshot_data["snap_faults"].items():
out.write(f"\t* {self.fault_names.get(k, k)}: {v}\n")
if snapshot_data["snap_sql"]:
out.write("\nSQL Queries:\n")
sql_table = prettytable.PrettyTable(["CMD", "Time", "SQL-query"])
list(map(sql_table.add_row, snapshot_data["snap_sql"]))
out.write(sql_table.get_string())
out.write("\nProcesses:\n")
# fields = ('PID', 'COM', 'SPEED', 'MEM', 'IO', 'IOPS')
# table = prettytable.PrettyTable(fields=fields)
fields = set()
for data in list(snapshot_data["snap_proc"].values()):
for key in list(data.keys()):
fields.add(key)
# Keys list for data extacting
data_keys = list(["PID"])
# Form table header: PID, CMD, Memory (Mb), CPU (%)
table_columns = ["PID"]
if "MEM" in fields:
table_columns.append("Memory (Mb)")
data_keys.append("MEM")
if "CPU" in fields:
table_columns.append("CPU (%)")
data_keys.append("CPU")
if "CMD" in fields:
table_columns.append("CMD")
data_keys.append("CMD")
table = prettytable.PrettyTable(table_columns)
# Left align for CMD column, if it present
if "CMD" in table_columns:
table.align["CMD"] = "l"
# Do process data aggregation (CPU/MEM summing for all threads of same process)
snap_proc_aggr = self._process_data_aggregate(snapshot_data["snap_proc"])
for data in snap_proc_aggr:
table.add_row([data.get(k, "N/A") for k in data_keys])
out.write(str(table))
out.write("\n\n")
if snapshot_data["snap_http"]:
out.write("Http requests:\n")
http_table = prettytable.PrettyTable(["Pid", "Domain", "Http type", "Path", "Http version", "Time"])
list(map(http_table.add_row, snapshot_data["snap_http"]))
out.write(str(http_table))
out.write("\n\n")