HEX
Server: LiteSpeed
System: Linux premium195.web-hosting.com 4.18.0-553.54.1.lve.el8.x86_64 #1 SMP Wed Jun 4 13:01:13 UTC 2025 x86_64
User: dulandesilva (1215)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/snapshots/reader.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import datetime
import json
import sys

import lvestats.lib.commons.decorators
import prettytable
from lvestats.lib import dbengine, uidconverter
from lvestats.lib.commons import dateutil
from lvestats.lib.jsonhandler import prepare_data_json
from lvestats.lib.parsers.lve_read_snapshot_argparse import lve_read_snapshot_parser
from lvestats.lib.snapshot import Snapshot
from lvestats.orm.incident import incident
from sqlalchemy import or_
from sqlalchemy.orm import sessionmaker

REPORT_HEADER = "Snapshots collected starting from %s to %s for lve id %d @ %s:\n"
REPORT_FOOTER = "Done..\n"

DEFAULT_SERVER_ID = "localhost"  # used when nothing is configured or specified in cmdline


def _calculate_period(opts):
    if opts.period:
        start, end = opts.period
    elif opts.timestamp:
        start = opts.timestamp
        end = start + 0.999999
    else:
        try:
            start = dateutil.parse_date(" ".join(opts.ffrom))
            end = dateutil.parse_date(" ".join(opts.to))
        except ValueError:
            print("please use [YY]YY-MM-DD[ HH:MM] format for --from and --to")
            return None, None
    return start, end


@lvestats.lib.commons.decorators.no_sigpipe
def snapshot_reader_main(config, argv_=None):
    parser = lve_read_snapshot_parser()
    opts = parser.parse_args(argv_)

    if not opts.id and not opts.user:
        parser.print_help()
        print("One of -u --user or -i --id should be specified")
        return 1

    if opts.snap_sql_item is not None and (not opts.timestamp or not opts.json):
        print("--snap-sql-item can only be used with --timestamp and --json options")
        return 1

    try:
        engine = dbengine.make_db_engine(config)
    except dbengine.MakeDbException as e:
        print(e)
        return 1

    server_id = config.get("server_id", DEFAULT_SERVER_ID)

    if opts.user:
        uid = uidconverter.username_to_uid(opts.user, server_id, server_id, engine)
        if uid is None:
            print(f"User {opts.user}@{server_id} not found")
            return 1
    else:
        uid = opts.id
    start, end = _calculate_period(opts)
    if start is None and end is None:
        return 1
    lve_read_snapshot = LVEReadSnaphot(
        engine,
        start,
        end,
        uid,
        server_id,
        opts.output,
        opts.json,
        opts.compact,
        opts.snap_sql_item,
    )
    if opts.list:
        lve_read_snapshot.list()  # show snapshots timestamps list
    elif opts.stats:
        lve_read_snapshot.stats(opts.unit)
    else:
        lve_read_snapshot.run()


def _try_convert_to_timestamp(o):
    """
    Convert local datetime to unix timestamp, or just passes
    unix timestamp as output if specified.

    :param o:
    :return:
    """
    if isinstance(o, datetime.datetime):
        return dateutil.gm_datetime_to_unixtimestamp(dateutil.local_to_gm(o))
    return o


class LVEReadSnaphot:
    def __init__(
        self,
        engine,
        start,
        end,
        uid,
        server_id,
        output_file,
        do_json,
        compact_mode=False,
        snap_sql_item=None,
    ):
        """

        :param start: datetime.datetime | int (unix timestamp)
        :param end: datetime.datetime | int (unix timestamp)
        :param uid:
        :param server_id:
        :param output_file: filename
        :param do_json: boolean
        :param compact_mode: boolean - truncate SQL queries for memory efficiency
        :param snap_sql_item: int - index of specific SQL query to return (only with --timestamp)
        """
        self.engine = engine
        self.do_json = do_json
        self.output_file = output_file
        self.uid = uid
        self.start = _try_convert_to_timestamp(start)
        self.end = _try_convert_to_timestamp(end)
        self.server_id = server_id
        self.compact_mode = compact_mode
        self.snap_sql_item = snap_sql_item
        # Faults names dictionary
        self.fault_names = {
            "cpu_fault": "CPU",
            "mem_fault": "Virtual memory",
            "mep_fault": "EP",
            "memphy_fault": "Physical memory",
            "nproc_fault": "NPROC",
            "io_fault": "IO",
            "iops_fault": "IOPS",
        }

    def truncate_sql_queries(self, snap_sql, max_length=200):
        """
        Truncate SQL queries for compact mode to reduce memory usage.

        :param snap_sql: List of SQL query arrays [cmd, time, sql_query]
        :param max_length: Maximum length for SQL query strings
        :return: List of truncated SQL query arrays
        """
        if not snap_sql:
            return snap_sql

        truncated = []
        for query_entry in snap_sql:
            # has SQL query at index 2
            if len(query_entry) >= 3:
                cmd, time, sql = query_entry[0], query_entry[1], query_entry[2]
                if len(sql) > max_length:
                    sql = sql[:max_length] + "..."
                truncated.append([cmd, time, sql])
            else:
                # preserve as-is if unexpected format
                truncated.append(query_entry)
        return truncated

    def _handle_sql_item_request(self, snapshots_generator, out):
        """
        Handle --snap-sql-item request: return specific SQL query by index.

        :param snapshots_generator: Generator of snapshot data
        :param out: Output stream
        :return: None (writes JSON directly to output)
        """
        snapshots_list = list(snapshots_generator)

        if not snapshots_list:
            out.write('{"success": 0, "error": "No snapshot found for the specified timestamp"}\n')
            out.flush()
            return

        if len(snapshots_list) > 1:
            out.write('{"success": 0, "error": "--snap-sql-item only works with --timestamp for single snapshots"}\n')
            out.flush()
            return

        snapshot_data = snapshots_list[0]
        snap_sql = snapshot_data.get("snap_sql", [])

        if self.snap_sql_item < 0 or self.snap_sql_item >= len(snap_sql):
            error_msg = f"SQL item index {self.snap_sql_item} out of range. Available indices: 0-{len(snap_sql) - 1}"
            out.write(f'{{"success": 0, "error": "{error_msg}"}}\n')
            out.flush()
            return

        sql_item = snap_sql[self.snap_sql_item]
        result = {"success": 1, "data": sql_item}

        out.write(json.dumps(result, indent=2))
        out.write("\n")
        out.flush()

    def get_incidents(self, session):
        return (
            session.query(incident)
            .filter(
                incident.uid == self.uid,
                incident.server_id == self.server_id,
                or_(
                    incident.incident_start_time.between(self.start, self.end),
                    incident.incident_end_time.between(self.start, self.end),
                ),
            )
            .order_by(incident.incident_start_time)
            .all()
        )

    def stats_by_incident(self, session):
        result = []
        for i in self.get_incidents(session):
            result.append(
                {
                    "from": i.incident_start_time,
                    "to": max(i.dump_time, i.incident_end_time or 0),
                    "incidents": 1,
                    "snapshots": i.snapshot_count,
                    "duration": self.get_duration(i),
                }
            )
        return result

    @staticmethod
    def get_duration(i, from_=0, to_=sys.maxsize):
        from_ = max(i.incident_start_time, from_)
        to_ = min(max(i.dump_time, i.incident_end_time or 0), to_)
        return to_ - from_

    @staticmethod
    def get_incident_count(incidents, pos, from_ts, to_ts):
        count = 0
        duration = 0
        while pos < len(incidents):
            i = incidents[pos]
            if i.dump_time < from_ts:
                pos += 1
                continue
            if i.incident_start_time > to_ts:
                break  # we are done
            count += 1
            pos += 1
            duration += LVEReadSnaphot.get_duration(i, from_ts, to_ts)
        if count == 0:
            return 0, 0, pos
        else:
            return count, duration, pos - 1

    def stats_by_time_unit(self, session, time_unit):
        incidents = self.get_incidents(session)
        snapshot_files = Snapshot({"uid": self.uid}).get_file_list()
        result = []
        from_ts = self.start
        pos = 0
        while from_ts < self.end:
            to_ts = min(from_ts + time_unit, self.end)
            incident_count, duration, pos = self.get_incident_count(incidents, pos, from_ts, to_ts)
            if incident_count == 0:  # skip this one, we have nothing here
                from_ts = to_ts
                continue
            snapshots = Snapshot.snapshot_filter(snapshot_files, from_ts, to_ts)
            if len(snapshots) == 0:
                snapshots.append(0)  # always show like there is at least one snapshot
            result.append(
                {
                    "from": from_ts,
                    "to": to_ts,
                    "incidents": incident_count,
                    "snapshots": len(snapshots),
                    "duration": duration,
                }
            )
            from_ts = to_ts
        return result

    def print_stats_json(self, stats):
        data = {"from": self.start, "to": self.end, "stats": stats}

        out = self.open()
        out.write(prepare_data_json(data))
        out.write("\n")
        out.flush()

    def print_stats(self, stats):
        out = self.open()

        out.write(f"Stats from {dateutil.ts_to_iso(self.start)} to {dateutil.ts_to_iso(self.end)}\n")
        for stat in stats:
            out.write("---\n")
            out.write(f"\tfrom: {dateutil.ts_to_iso(stat['from'])}\n")
            out.write(f"\tto: {dateutil.ts_to_iso(stat['to'])}\n")
            out.write(f"\tincidents: {stat['incidents']}\n")
            out.write(f"\tsnapshots: {stat['snapshots']}\n")
            out.write(f"\tduration: {stat['duration']} sec.\n")
        out.flush()

    def stats(self, stats_unit_str):
        try:
            time_unit = dateutil.parse_period2(stats_unit_str)
            if self.end - self.start < time_unit:
                # this prevents situations when we get stats for last 10 minutes, but group it by 1 day
                self.start = self.end - time_unit
            group_by_incident = False
        except ValueError:
            time_unit = None
            group_by_incident = stats_unit_str == "auto"
            if not group_by_incident:
                raise

        session = sessionmaker(bind=self.engine)()
        try:
            if group_by_incident:
                stats = self.stats_by_incident(session)
            else:
                stats = self.stats_by_time_unit(session, time_unit)

            session.expunge_all()

            if self.do_json:
                self.print_stats_json(stats)
            else:
                self.print_stats(stats)
        finally:
            session.close()

    def run(self):
        snapshots_obj = Snapshot({"uid": self.uid})
        self.report(snapshots_obj.get_snapshots(self.start, self.end))

    def list(self):
        snapshots = Snapshot({"uid": self.uid})
        snapshots_list = snapshots.get_ts_list(self.start, self.end)
        out = self.open()
        if self.do_json:
            out.write(prepare_data_json(snapshots_list))
        else:
            out.write(
                f"Snapshots timestamp list; from {dateutil.ts_to_iso(self.start)} "
                f"to {dateutil.ts_to_iso(self.end)} for lve id {self.uid}\n"
            )
            for ts in snapshots_list:
                out.write(dateutil.ts_to_iso(ts))
                out.write("\n")
            out.write(REPORT_FOOTER)
        out.flush()

    def report(self, snapshots_generator):
        out = self.open()

        if self.do_json:
            # Handle --snap-sql-item request (single SQL query by index)
            if self.snap_sql_item is not None:
                self._handle_sql_item_request(snapshots_generator, out)
                return

            # Normal snapshot list output
            out.write('{"success": 1, "data": {"snapshots": [')
            first_snapshot = True

            for snapshot_data in snapshots_generator:
                # Apply SQL truncation in compact mode
                if self.compact_mode and "snap_sql" in snapshot_data:
                    snapshot_data["snap_sql"] = self.truncate_sql_queries(snapshot_data["snap_sql"])

                if not first_snapshot:
                    out.write(", ")
                else:
                    first_snapshot = False

                out.write(json.dumps(snapshot_data))

            out.write("]}}")
            out.write("\n")
            out.flush()
            return

        out.write(
            REPORT_HEADER % (dateutil.ts_to_iso(self.start), dateutil.ts_to_iso(self.end), self.uid, self.server_id)
        )

        snapshot_count = 0
        for snapshot_data in snapshots_generator:
            # Apply SQL truncation in compact mode
            if self.compact_mode and "snap_sql" in snapshot_data:
                snapshot_data["snap_sql"] = self.truncate_sql_queries(snapshot_data["snap_sql"])

            self.format_snapshot(out, snapshot_data)
            snapshot_count += 1

        if snapshot_count == 0:
            out.write("No snapshots found for the specified time period.\n")

        out.write(REPORT_FOOTER)
        out.flush()

    def open(self):
        if self.output_file:
            try:
                return open(self.output_file, "w", encoding="utf-8")
            except IOError:
                pass  # maybe need error message
                # fixme --> if we are trying to write to a file, and cannot,
                # this is an error, we shouldn't write to stdout
        return sys.stdout

    @staticmethod
    def _process_data_aggregate(process_data):
        """
        Aggregate process data by PID by summing CPU % and MEM for same PIDs

        :param process_data: input data. Dictionary:
        {
            u'151048': {u'MEM': u'1', u'CMD': u'bash', u'PID': u'151048', u'CPU': u'0%'},
            u'151047': {u'MEM': u'1', u'CMD': u'su cltest1', u'PID': u'151047', u'CPU': u'0%'},
            u'153642': {u'MEM': u'1', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'0%'},
            u'153641': {u'MEM': u'1', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'0%'},
            u'153640': {u'MEM': u'1', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'5%'}
        }

        :return: Output data - List of dictionaries:
        [
            {u'MEM': u'1', u'CMD': u'bash', u'PID': u'151048', u'CPU': u'0%'},
            {u'MEM': u'1', u'CMD': u'su cltest1', u'PID': u'151047', u'CPU': u'0%'},
            {u'MEM': u'3', u'CMD': u'./threads', u'PID': u'153640', u'CPU': u'5%'},
        ]
        """
        # 1. Build thread dictionary as
        #  pid: {'PID', 'CMD', 'MEM', 'CPU'}
        #  and aggregate data
        thread_dict = {}
        for _, proc_data in process_data.items():
            if "PID" not in proc_data:
                # old format snapshot, do not aggregate it
                # Example of old format snapshot:
                # {u'31228': {u'MEM': u'1', u'CMD': u'31228', u'IOPS': u'N/A', u'CPU': u'1%', u'IO': u'N/A'}}
                pid = proc_data["CMD"]
                process_data_new = {}
                process_data_new["PID"] = pid
                process_data_new["MEM"] = proc_data["MEM"]
                process_data_new["CMD"] = pid
                process_data_new["CPU"] = proc_data["CPU"]
                thread_dict[pid] = process_data_new
                continue
            pid = proc_data["PID"]
            # remove '%' from CPU value and convert CPU/MEM to integers
            if proc_data["CPU"] != "N/A":
                proc_data["CPU"] = int(proc_data["CPU"].replace("%", ""))
            if proc_data["MEM"] != "N/A":
                proc_data["MEM"] = int(proc_data["MEM"])
            if pid in thread_dict:
                # PID already present, add new data to it
                if proc_data["CPU"] != "N/A":
                    if thread_dict[pid]["CPU"] != "N/A":
                        thread_dict[pid]["CPU"] += proc_data["CPU"]
                    else:
                        thread_dict[pid]["CPU"] = proc_data["CPU"]
                if proc_data["MEM"] != "N/A":
                    if thread_dict[pid]["MEM"] != "N/A":
                        thread_dict[pid]["MEM"] += proc_data["MEM"]
                    else:
                        thread_dict[pid]["MEM"] = proc_data["MEM"]
            else:
                # PID absent, add it
                thread_dict[pid] = proc_data
        # 2. Build output list
        out_data = list(thread_dict.values())
        return out_data

    def format_snapshot(self, out, snapshot_data):
        out.write(f">>> {dateutil.ts_to_iso(snapshot_data['dump_time'])}, UID {snapshot_data['uid']}\n")

        out.write("\nFaults:\n")
        for k, v in snapshot_data["snap_faults"].items():
            out.write(f"\t* {self.fault_names.get(k, k)}: {v}\n")

        if snapshot_data["snap_sql"]:
            out.write("\nSQL Queries:\n")
            sql_table = prettytable.PrettyTable(["CMD", "Time", "SQL-query"])
            list(map(sql_table.add_row, snapshot_data["snap_sql"]))
            out.write(sql_table.get_string())

        out.write("\nProcesses:\n")
        # fields = ('PID', 'COM', 'SPEED', 'MEM', 'IO', 'IOPS')
        # table = prettytable.PrettyTable(fields=fields)
        fields = set()
        for data in list(snapshot_data["snap_proc"].values()):
            for key in list(data.keys()):
                fields.add(key)

        # Keys list for data extacting
        data_keys = list(["PID"])
        # Form table header: PID, CMD, Memory (Mb), CPU (%)
        table_columns = ["PID"]
        if "MEM" in fields:
            table_columns.append("Memory (Mb)")
            data_keys.append("MEM")
        if "CPU" in fields:
            table_columns.append("CPU (%)")
            data_keys.append("CPU")
        if "CMD" in fields:
            table_columns.append("CMD")
            data_keys.append("CMD")
        table = prettytable.PrettyTable(table_columns)
        # Left align for CMD column, if it present
        if "CMD" in table_columns:
            table.align["CMD"] = "l"
        # Do process data aggregation (CPU/MEM summing for all threads of same process)
        snap_proc_aggr = self._process_data_aggregate(snapshot_data["snap_proc"])
        for data in snap_proc_aggr:
            table.add_row([data.get(k, "N/A") for k in data_keys])

        out.write(str(table))
        out.write("\n\n")

        if snapshot_data["snap_http"]:
            out.write("Http requests:\n")
            http_table = prettytable.PrettyTable(["Pid", "Domain", "Http type", "Path", "Http version", "Time"])
            list(map(http_table.add_row, snapshot_data["snap_http"]))
            out.write(str(http_table))
            out.write("\n\n")