File: //proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/lib/snapshot.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import json
import logging
import os
import pwd
import shutil
import sys
import zlib
from collections.abc import Iterator
from contextlib import contextmanager
from typing import BinaryIO
SNAPSHOT_PATH = "/var/lve/snapshots"
SNAPSHOT_EXT = ".snapshot"
SNAPSHOT_EXT_LEN = len(SNAPSHOT_EXT)
class Snapshot:
"""
Class responsible for loading and saving snapshot files for the interval.
The files will be saved in a format of: `/var/lve/snapshots/[uid]/dump_time.snapshot`
dump_time is the timestamp/integer.
The directories `/var/lve/snapshots/[uid]` and the dump files themselves will be owned by user.
They will not be readable by other users.
"""
def __init__(self, incident: dict, compresslevel: int = 1) -> None:
self.compresslevel = compresslevel
self.incident = incident
self.log = logging.getLogger("lib-snapshot")
def save(self, data: dict) -> str:
dump_date = data["dump_time"]
assert dump_date is not None
# convert possible non-ascii data to unicode
self._replace_unicode_data(data)
json_compressed = zlib.compress(json.dumps(data).encode(), self.compresslevel)
with self.create_file(dump_date) as f:
f.write(json_compressed)
self.log.debug("Snapshot dumped to file %s", f.name)
return f.name
@staticmethod
def _to_unicode(obj):
if isinstance(obj, bytes):
return obj.decode("utf-8", "replace")
return obj
def _replace_unicode_data(self, data: dict) -> None:
u_queries = []
for query in data.get("snap_sql", []):
u_queries.append(list(map(self._to_unicode, query)))
data["snap_sql"] = u_queries
u_urls = []
for http in data.get("snap_http", []):
u_urls.append(list(map(self._to_unicode, http)))
data["snap_http"] = u_urls
def get_file_list(self) -> list[str]:
dir_ = self.get_dir()
if os.path.isdir(dir_):
return os.listdir(dir_)
return []
def get_ts_list(
self,
from_ts: int | None,
to_ts: int | None,
) -> list[int]:
"""
Return ordered list of timestamps when snapshots for this user were created.
:param from_ts: Start timestamp for filtering snapshots (inclusive).
If None, starts from the earliest snapshot.
:param to_ts: End timestamp for filtering snapshots (inclusive).
If None, includes all snapshots up to the latest.
:return: List of timestamps ordered for the given period.
"""
return self.snapshot_filter(self.get_file_list(), from_ts, to_ts)
def get_snapshots(
self,
from_ts: int | None,
to_ts: int | None,
) -> Iterator[dict]:
"""
Get snapshots for a given period.
Yields one snapshot at a time for memory efficiency.
:param from_ts: Start timestamp for filtering snapshots (inclusive).
If None, starts from the earliest snapshot.
:param to_ts: End timestamp for filtering snapshots (inclusive).
If None, includes all snapshots up to the latest.
:return: Iterator of snapshot dictionaries.
"""
for ts in self.get_ts_list(from_ts, to_ts):
try:
filename = self.get_file_name(self.ts_to_name(ts))
if not os.geteuid():
with drop_privileges(self.incident["uid"]):
content = self.read_file_content(filename)
else:
content = self.read_file_content(filename)
snapshot_data = json.loads(content)
yield snapshot_data
except (ValueError, OSError) as ve:
self.log.warning(
"Corrupted file: %s (%s)",
self.get_file_name(self.ts_to_name(ts)),
str(ve),
)
def read_file_content(self, filename: str) -> str:
with open(filename, "rb") as f:
content = f.read()
try:
content = zlib.decompress(content)
except zlib.error:
compressed_content = zlib.compress(content, self.compresslevel)
with open(filename, "wb") as f:
f.write(compressed_content)
return content.decode()
def get_incident_snapshots(self) -> list:
"""
Load all snapshots for given incident
:return: list of snapshots
"""
return list(self.get_snapshots(self.incident["incident_start_time"], self.incident["incident_end_time"]))
def get_dir(self) -> str:
return os.path.join(SNAPSHOT_PATH, str(self.incident["uid"]))
def get_file_name(self, name: str) -> str:
return os.path.join(self.get_dir(), name)
def create_file(self, dump_date: int) -> BinaryIO:
"""
create file, change its ownership & permissions if needed. Create directories if needed as well
:param dump_date: int timestamp used as file name
:return: open File object
"""
dir_ = self.get_dir()
if not os.path.exists(dir_):
try: # sacrifice security if we cannot setup ownership properly
os.makedirs(dir_)
os.chmod(dir_, 0o751)
os.chown(dir_, self.incident["uid"], 0)
except OSError as e:
self.log.error("Unable to create dir %s (%s)", dir_, str(e))
file_name = self.get_file_name(self.ts_to_name(dump_date))
with drop_privileges(self.incident["uid"]):
file_ = open(file_name, "wb") # pylint: disable=consider-using-with
try:
os.fchmod(file_.fileno(), 0o400)
except OSError as e:
self.log.error("Unable to set file permissions %s (%s)", file_name, str(e))
return file_
def delete_old(self, to_ts: int) -> None:
"""
Delete old snapshots. If there are no more
:param to_ts: up to which timestamp to remove snapshots
:return: None
"""
_dir = self.get_dir()
files = os.listdir(_dir)
all_snapshots = self.snapshot_filter(files)
ts_to_remove = self.snapshot_filter(files, to_ts=to_ts)
if all_snapshots == ts_to_remove:
shutil.rmtree(_dir, ignore_errors=True)
else:
for ts in ts_to_remove:
os.remove(self.get_file_name(self.ts_to_name(ts)))
@staticmethod
def get_ts(file_: str) -> int | None:
if file_.endswith(SNAPSHOT_EXT):
ts = file_[0:-SNAPSHOT_EXT_LEN]
if ts.isdigit():
try:
return int(ts)
except ValueError:
pass
return None
@staticmethod
def snapshot_filter(
files: list[str],
from_ts: int | None = None,
to_ts: int | None = None,
) -> list[int]:
if from_ts is None:
from_ts = 0
if to_ts is None:
to_ts = sys.maxsize
result = []
for filename in files:
ts = Snapshot.get_ts(filename)
if ts is not None and from_ts <= ts <= to_ts:
result.append(ts)
return sorted(result)
@staticmethod
def ts_to_name(ts: int) -> str:
return str(ts) + SNAPSHOT_EXT
@contextmanager
def drop_privileges(uid: int):
old_uid, old_gid, old_groups = os.getuid(), os.getgid(), os.getgroups()
gid = pwd.getpwnam("nobody")[3]
os.setgroups([])
os.setegid(gid)
os.seteuid(uid)
try:
yield
finally:
os.seteuid(old_uid)
os.setegid(old_gid)
os.setgroups(old_groups)