Source code for fftools.shell

import bz2
import gzip
import lzma
import shutil
import subprocess
from pathlib import Path
from typing import Literal

from fabric import Connection
from loguru import logger as log

from fftools.other import pop_dict


[docs] def run_command( command: str | list[str] | tuple[str] | set[str], timeout: int = 10, check_ecode: bool = True, **kwargs, ) -> tuple[int, str, str]: """run local shell command. Args: command: command to run. timeout: timeout in seconds. Defaults to 10. check_ecode: if exception should be raised on non-zero exit codes. Defaults to True. kwargs: other kwargs directly supplied to `subprocess.run` Raises: ValueError: if command is not either a `str`, `list`, `tuple` or `set`. exc: if `check_ecode` is `True` and the exit code is not 0. Returns: tuple of (exit code, stdout, stderr). """ if isinstance(command, str): cmd = command.split() elif isinstance(command, list | tuple | set): cmd = list(command) else: raise ValueError # remove args from kwargs filtered_kwargs = pop_dict( kwargs, [ "check", "timeout", "capture_output", "encoding", ], ) log.info(f"running command='{' '.join(cmd)}'. {timeout=}") try: proc = subprocess.run( cmd, check=check_ecode, timeout=timeout, capture_output=True, encoding="utf8", **filtered_kwargs, ) except Exception as exc: log.error(f"can't run {command=}. {exc=}") raise exc stdout: str = proc.stdout.strip() stderr: str = proc.stderr.strip() exit_code: int = proc.returncode return exit_code, stdout, stderr
[docs] def run_ssh_command( command: str | list[str] | tuple[str] | set[str], hostname: str, username: str = "root", port: int = 22, timeout: int = 10, check_ecode: bool = True, password: str | None = None, **kwargs, ) -> tuple[int, str, str]: """run command on remote machine via ssh. Args: command: command to run. hostname: hostname of remote device. username: ssh username. Defaults to "root". port: ssh port. Defaults to 22. timeout: command timeout. Defaults to 10. check_ecode: if exception should be raised on non-zero exit codes. Defaults to True. password: ssh user password if not authenticated via ssh keys. Defaults to None. kwargs: other kwargs directly supplied to `fabric.Connection.run` Raises: ValueError: if command is not either a `str`, `list`, `tuple` or `set`. exc: if `check_ecode` is `True` and the exit code is not 0. Returns: tuple of (exit code, stdout, stderr) """ if isinstance(command, str): cmd = command elif isinstance(command, list | tuple | set): cmd = " ".join(command) else: raise ValueError # remove args from kwargs filtered_kwargs = pop_dict( kwargs, [ "hide", "warn", "timeout", ], ) log.info( f"running ssh command on host={username}@{hostname}:{port}, command='{cmd}'. {timeout=}", ) try: connection = Connection( host=hostname, user=username, port=port, connect_timeout=10, connect_kwargs={"password": password} if password else {}, ) with connection as con: proc = con.run( cmd, hide=True, warn=(not check_ecode), timeout=timeout, encoding="utf8", **filtered_kwargs, ) except Exception as exc: log.error(f"can't run ssh {command=}. {exc=}") raise exc stdout: str = proc.stdout.strip() stderr: str = proc.stderr.strip() exit_code: int = proc.return_code return exit_code, stdout, stderr
[docs] def extract_archive(archive: Path) -> None: """extract an archive file. supportet are: tar, zip, gzip, bzip2 and xz. Args: archive: path of the archive to extract. """ log.debug(f"trying to extract archive={archive}") dual_suffix = "".join(archive.suffixes[-2:]) archive_suffix = dual_suffix if ".tar." in dual_suffix else archive.suffix tarzip_suffixes = [".tar.bz2", ".tbz2", ".tar.gz", ".tgz", ".tar", ".tar.xz", ".txz", ".zip"] xz_suffixes = [".xz"] bz_suffixes = [".bz", ".bzip", ".bz2", ".bzip2"] gz_suffixes = [".gz", ".gzip"] if archive_suffix in tarzip_suffixes: extract_dir = archive.parent / archive.name.removesuffix(archive_suffix) log.info(f"extracting tarzip archive={archive}, extract to={extract_dir}") shutil.unpack_archive(archive, extract_dir) elif archive_suffix in bz_suffixes: extract_file("bzip", archive, archive.with_suffix("")) elif archive_suffix in gz_suffixes: extract_file("gzip", archive, archive.with_suffix("")) elif archive_suffix in xz_suffixes: extract_file("xz", archive, archive.with_suffix("")) else: log.debug("no matching archive suffix found")
[docs] def extract_file(atype: Literal["gzip", "bzip", "xz"], infile: Path, outfile: Path) -> None: """extract file. Args: atype: archive type. infile: compressed input file outfile: decompressed output file. Raises: ValueError: on invalid `atype`. """ log.info(f"extracting {atype} file={infile} -> {outfile}") try: match atype: case "gzip": with gzip.open(infile, "rb") as i, outfile.open("wb") as o: shutil.copyfileobj(i, o, length=64 * 1024) case "bzip": with bz2.open(infile, "rb") as i, outfile.open("wb") as o: shutil.copyfileobj(i, o, length=64 * 1024) case "xz": with lzma.open(infile, "rb") as i, outfile.open("wb") as o: shutil.copyfileobj(i, o, length=64 * 1024) case _: log.error(f"invalid archive type={atype}") raise ValueError except Exception as exc: log.error(f"cant extract {atype} archive={infile}. {exc=}")