Source code for fftools.web

from pathlib import Path
from typing import Any, Literal

import dns.resolver
import httpx
from loguru import logger as log

from fftools.other import pop_dict


[docs] def download_file( url: str, download_path: Path, timeout: int = 30, follow_redirects: bool = True, chunk_size: int | None = 64 * 1024, username: str | None = None, password: str | None = None, **kwargs, ) -> None: """download a file with httpx.stream. Args: url: url of the file. download_path: ath to save the file to. timeout: timeout of the request in seconds. Defaults to 30. follow_redirects: follow HTTP 3XX redirects. `None` sets it to the server response default. Defaults to True. chunk_size: size of the chunks which are downloaded. Defaults to 64*1024. username: username for basic auth. if unset no authentication is used. Defaults to None. password: password for basic auth. if unset no authentication is used. Defaults to None. kwargs: other kwargs directly supplied to `httpx.request` Raises: ValueError: if requested file size is 0 bytes. exc: on download errors. """ # remove args from kwargs filtered_kwargs = pop_dict( kwargs, [ "url", "timeout", "follow_redirects", "auth", ], ) auth = httpx.BasicAuth(username=username, password=password) if username and password else None log.info(f"HTTP DOWNLOAD {url}") try: with ( httpx.stream( "GET", url, timeout=timeout, follow_redirects=follow_redirects, auth=auth, **filtered_kwargs, ) as r, download_path.open(mode="wb") as f, ): r.raise_for_status() content_length = int(r.headers.get("content-length", 0)) if content_length <= 0: log.error(f"invalid content length={content_length}") raise ValueError for chunk in r.iter_bytes(chunk_size=chunk_size): f.write(chunk) except Exception as exc: log.error(f"can't save file={download_path}. {exc=}") download_path.unlink(missing_ok=True) raise exc
[docs] def upload_file( method: Literal["POST", "PUT", "PATCH"], url: str, file_path: Path, timeout: int = 30, username: str | None = None, password: str | None = None, **kwargs, ) -> httpx.Response: """upload a file with httpx. Args: method: http method to use for the upload. url: upload url. file_path: path of file to upload. timeout: timeout of the request in seconds. Defaults to 30. username: username for basic auth. if unset no authentication is used. Defaults to None. password: password for basic auth. if unset no authentication is used. Defaults to None. kwargs: other kwargs directly supplied to `httpx.request` Raises: exc: on upload errors. Returns: `httpx.Response` object """ # remove args from kwargs filtered_kwargs = pop_dict( kwargs, [ "url", "timeout", "auth", ], ) auth = httpx.BasicAuth(username=username, password=password) if username and password else None log.info(f"HTTP UPLOAD {method} {url}") try: files = {"upload-file": file_path.open("rb")} response = httpx.request( method=method, url=url, timeout=timeout, files=files, auth=auth, **filtered_kwargs, ) except Exception as exc: log.error(f"cant upload file={file_path} {exc=}") raise exc return response
[docs] def req( method: Literal["GET", "POST", "HEAD", "PUT", "DELETE", "PATCH", "OPTIONS"], url: str, timeout: float = 5, default_headers: bool = True, follow_redirects: bool = True, payload: dict[str, Any] | str | bytes | None = None, username: str | None = None, password: str | None = None, **kwargs, ) -> httpx.Response: """make a http(s) request. Args: method: HTTP method, e.g. GET. url: url for the request. timeout: timeout of the request in seconds. Defaults to 5. default_headers: send default headers for `content-type` and `accept`. Defaults to True. follow_redirects: follow HTTP 3XX redirects. Defaults to True. payload: data to send. either `dict`, `str` or `bytes`. Defaults to None. username: username for basic auth. if unset no authentication is used. Defaults to None. password: password for basic auth. if unset no authentication is used. Defaults to None. kwargs: other kwargs directly supplied to `httpx.request` Raises: exc: on request errors. Returns: `httpx.Response` object """ _default_headers = { "accept": "application/json", "content-type": "application/json; charset=UTF-8", } headers = kwargs.get("headers", {}) if default_headers: headers.update(_default_headers) # remove args from kwargs filtered_kwargs = pop_dict( kwargs, [ "url", "timeout", "headers", "follow_redirects", "json", "content", "auth", ], ) auth = httpx.BasicAuth(username=username, password=password) if username and password else None log.info(f"HTTP {method} {url}") try: response = httpx.request( method=method, url=url, timeout=timeout, headers=headers, follow_redirects=follow_redirects, json=payload if isinstance(payload, dict) else None, content=payload if isinstance(payload, str | bytes) else None, auth=auth, **filtered_kwargs, ) except Exception as exc: log.error(f"error in {method} request. {exc=}") raise exc return response
[docs] def dig(fqdn: str, record_type: str) -> list[str]: """resolve dns record. Args: fqdn: fqdn of record to resolve record_type: type of record, e.g. A or AAA Raises: ValueError: if no answer was found. exc: on resolving errors. Returns: a list of resolved addresses """ record_type = record_type.upper() resolver = dns.resolver.Resolver(configure=False) resolver.nameservers = ["9.9.9.9", "1.1.1.1"] resolver.timeout = 5 resolver.lifetime = 5 log.info(f"DIG {fqdn}") try: answer = resolver.resolve(fqdn, record_type) if not answer.rrset: raise ValueError resolved_records = [n.to_text() for n in answer.rrset] except Exception as exc: log.warning(f"error while resolving record(s). {exc=}") raise exc log.info(f"resolved record(s) {fqdn=}, {resolved_records=}") return resolved_records