HEX
Server: LiteSpeed
System: Linux premium195.web-hosting.com 4.18.0-553.54.1.lve.el8.x86_64 #1 SMP Wed Jun 4 13:01:13 UTC 2025 x86_64
User: dulandesilva (1215)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/lib/snapshot.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import json
import logging
import os
import pwd
import shutil
import sys
import zlib
from collections.abc import Iterator
from contextlib import contextmanager
from typing import BinaryIO

SNAPSHOT_PATH = "/var/lve/snapshots"
SNAPSHOT_EXT = ".snapshot"
SNAPSHOT_EXT_LEN = len(SNAPSHOT_EXT)


class Snapshot:
    """
    Class responsible for loading and saving snapshot files for the interval.

    The files will be saved in a format of: `/var/lve/snapshots/[uid]/dump_time.snapshot`

    dump_time is the timestamp/integer.

    The directories `/var/lve/snapshots/[uid]` and the dump files themselves will be owned by user.
    They will not be readable by other users.
    """

    def __init__(self, incident: dict, compresslevel: int = 1) -> None:
        self.compresslevel = compresslevel
        self.incident = incident
        self.log = logging.getLogger("lib-snapshot")

    def save(self, data: dict) -> str:
        dump_date = data["dump_time"]
        assert dump_date is not None

        # convert possible non-ascii data to unicode
        self._replace_unicode_data(data)
        json_compressed = zlib.compress(json.dumps(data).encode(), self.compresslevel)
        with self.create_file(dump_date) as f:
            f.write(json_compressed)
            self.log.debug("Snapshot dumped to file %s", f.name)
        return f.name

    @staticmethod
    def _to_unicode(obj):
        if isinstance(obj, bytes):
            return obj.decode("utf-8", "replace")
        return obj

    def _replace_unicode_data(self, data: dict) -> None:
        u_queries = []
        for query in data.get("snap_sql", []):
            u_queries.append(list(map(self._to_unicode, query)))
        data["snap_sql"] = u_queries

        u_urls = []
        for http in data.get("snap_http", []):
            u_urls.append(list(map(self._to_unicode, http)))
        data["snap_http"] = u_urls

    def get_file_list(self) -> list[str]:
        dir_ = self.get_dir()
        if os.path.isdir(dir_):
            return os.listdir(dir_)
        return []

    def get_ts_list(
        self,
        from_ts: int | None,
        to_ts: int | None,
    ) -> list[int]:
        """
        Return ordered list of timestamps when snapshots for this user were created.

        :param from_ts: Start timestamp for filtering snapshots (inclusive).
        If None, starts from the earliest snapshot.
        :param to_ts: End timestamp for filtering snapshots (inclusive).
        If None, includes all snapshots up to the latest.
        :return: List of timestamps ordered for the given period.
        """
        return self.snapshot_filter(self.get_file_list(), from_ts, to_ts)

    def get_snapshots(
        self,
        from_ts: int | None,
        to_ts: int | None,
    ) -> Iterator[dict]:
        """
        Get snapshots for a given period.

        Yields one snapshot at a time for memory efficiency.

        :param from_ts: Start timestamp for filtering snapshots (inclusive).
        If None, starts from the earliest snapshot.
        :param to_ts: End timestamp for filtering snapshots (inclusive).
        If None, includes all snapshots up to the latest.
        :return: Iterator of snapshot dictionaries.
        """
        for ts in self.get_ts_list(from_ts, to_ts):
            try:
                filename = self.get_file_name(self.ts_to_name(ts))
                if not os.geteuid():
                    with drop_privileges(self.incident["uid"]):
                        content = self.read_file_content(filename)
                else:
                    content = self.read_file_content(filename)

                snapshot_data = json.loads(content)
                yield snapshot_data

            except (ValueError, OSError) as ve:
                self.log.warning(
                    "Corrupted file: %s (%s)",
                    self.get_file_name(self.ts_to_name(ts)),
                    str(ve),
                )

    def read_file_content(self, filename: str) -> str:
        with open(filename, "rb") as f:
            content = f.read()
        try:
            content = zlib.decompress(content)
        except zlib.error:
            compressed_content = zlib.compress(content, self.compresslevel)
            with open(filename, "wb") as f:
                f.write(compressed_content)
        return content.decode()

    def get_incident_snapshots(self) -> list:
        """
        Load all snapshots for given incident
        :return: list of snapshots
        """
        return list(self.get_snapshots(self.incident["incident_start_time"], self.incident["incident_end_time"]))

    def get_dir(self) -> str:
        return os.path.join(SNAPSHOT_PATH, str(self.incident["uid"]))

    def get_file_name(self, name: str) -> str:
        return os.path.join(self.get_dir(), name)

    def create_file(self, dump_date: int) -> BinaryIO:
        """
        create file, change its ownership & permissions if needed. Create directories if needed as well
        :param dump_date: int timestamp used as file name
        :return: open File object
        """
        dir_ = self.get_dir()
        if not os.path.exists(dir_):
            try:  # sacrifice security if we cannot setup ownership properly
                os.makedirs(dir_)
                os.chmod(dir_, 0o751)
                os.chown(dir_, self.incident["uid"], 0)
            except OSError as e:
                self.log.error("Unable to create dir %s (%s)", dir_, str(e))
        file_name = self.get_file_name(self.ts_to_name(dump_date))
        with drop_privileges(self.incident["uid"]):
            file_ = open(file_name, "wb")  # pylint: disable=consider-using-with
            try:
                os.fchmod(file_.fileno(), 0o400)
            except OSError as e:
                self.log.error("Unable to set file permissions %s (%s)", file_name, str(e))
        return file_

    def delete_old(self, to_ts: int) -> None:
        """
        Delete old snapshots. If there are no more
        :param to_ts: up to which timestamp to remove snapshots
        :return: None
        """
        _dir = self.get_dir()
        files = os.listdir(_dir)
        all_snapshots = self.snapshot_filter(files)
        ts_to_remove = self.snapshot_filter(files, to_ts=to_ts)
        if all_snapshots == ts_to_remove:
            shutil.rmtree(_dir, ignore_errors=True)
        else:
            for ts in ts_to_remove:
                os.remove(self.get_file_name(self.ts_to_name(ts)))

    @staticmethod
    def get_ts(file_: str) -> int | None:
        if file_.endswith(SNAPSHOT_EXT):
            ts = file_[0:-SNAPSHOT_EXT_LEN]
            if ts.isdigit():
                try:
                    return int(ts)
                except ValueError:
                    pass
        return None

    @staticmethod
    def snapshot_filter(
        files: list[str],
        from_ts: int | None = None,
        to_ts: int | None = None,
    ) -> list[int]:
        if from_ts is None:
            from_ts = 0
        if to_ts is None:
            to_ts = sys.maxsize
        result = []
        for filename in files:
            ts = Snapshot.get_ts(filename)
            if ts is not None and from_ts <= ts <= to_ts:
                result.append(ts)
        return sorted(result)

    @staticmethod
    def ts_to_name(ts: int) -> str:
        return str(ts) + SNAPSHOT_EXT


@contextmanager
def drop_privileges(uid: int):
    old_uid, old_gid, old_groups = os.getuid(), os.getgid(), os.getgroups()
    gid = pwd.getpwnam("nobody")[3]
    os.setgroups([])
    os.setegid(gid)
    os.seteuid(uid)
    try:
        yield
    finally:
        os.seteuid(old_uid)
        os.setegid(old_gid)
        os.setgroups(old_groups)