Source code for src.opencode

"""Module which provides the methods that correspond to the subcommands
of occmd"""
import json
import logging
import os
import subprocess as sp
from argparse import Namespace
from collections.abc import Callable, Generator
from pathlib import Path
from time import time
from typing import Any, Optional, cast
from urllib.parse import urlparse

from git.exc import GitCommandError, InvalidGitRepositoryError, NoSuchPathError
from git.repo import Repo
from gitlab import Gitlab
from gitlab.base import RESTObject, RESTObjectList
from gitlab.exceptions import GitlabGetError
from gitlab.v4.objects import Project

from src import checks
from src.config import context
from src.dashboard import Dashboard  # type: ignore[attr-defined]
from src.exceptions import CheckGoesBoomError, CoreGoesBoomError
from src.interfaces import CheckInterface
from src.opencode_git import clone_project

logger: logging.Logger = logging.getLogger(__name__)


[docs] class OpenCode: """Class which provides the methods that correspond to the subcommands of occmd""" url_opencode: str = context.settings["oc_gl_url"] apikey: Optional[str] = os.environ.get("OC_GL_APIKEY", default=None) p_db_raw: Path = context.settings["local_repo_db_path_db_raw"]
[docs] def __init__(self): self.gl = Gitlab(self.url_opencode, private_token=self.apikey) self._projects = None self._users = None
@property def users(self) -> list[RESTObject] | RESTObjectList: """returns: All users registered on the OpenCoDE platform Note: might not work if they change the API config""" if self._users is None: self._users = self.gl.users.list(get_all=True) return self._users @property def projects(self) -> list[Project]: """returns: All projects listed on the OpenCoDE platform""" if self._projects is None: self._projects = cast( list[Project], self.gl.projects.list(get_all=True) ) return self._projects
[docs] def get_project_by_id(self, _id: int) -> Project: """Get meta information about a project from its id""" return self.gl.projects.get(_id)
[docs] def get_repo_for_proj(self, proj: Project) -> Repo: """ Get the local repository for a remote project from the local OpenCoDE dump. """ try: if self.p_db_raw.as_posix() == ".": logger.info("No local OpenCoDE dump configured") raise NoSuchPathError # note: proj.http_url_to_repo.path is attacker controlled, path # traversal should not be an issue, due to Gitlab naming rules # it will always expect a folder which is 1+ deep and ends in .git repo: Repo = Repo( self.p_db_raw.as_posix() + urlparse(proj.http_url_to_repo).path ) except NoSuchPathError as E: logger.info( f"Failed to find repo for {proj.name_with_namespace}) locally ({E})" ) raise E return repo
[docs] def clone_repo_for_proj(self, proj: Project) -> Repo: """ Get the local repository for a remote project via cloning to temporary directory """ local_dir: Path = Path(f"/tmp/_occmd_{time()}_{proj.name}") clone_project(proj.http_url_to_repo, local_dir) return Repo(local_dir)
# pylint: disable=too-complex
[docs] def iter_projects( self, filter_func: Callable[[Project, Repo], bool] = lambda *_: False, _id: Optional[int] = None, directory: Optional[Path] = None, ) -> Generator[tuple[Project, Repo], None, None]: """ :param filter_func: skip project if this functions maps it to True :param _id: optional, Gitlab id of the project specified via 'directory' parameter :param directory: optional, local file system location of git root of project specified via 'id' parameter :returns: Api objects and local repo objects for all projects on OpenCoDE. Optionally: Only generates a single tuple for the project specified by 'id' and 'directory' parameters. """ # The case where we were called with a local repository and get # the corresponding API object via the provided project id. if _id and directory: try: yield (self.get_project_by_id(_id), Repo(directory)) except GitlabGetError as e: raise CoreGoesBoomError( f"Unable to find id {_id} on remote Gitlab" ) from e except InvalidGitRepositoryError as e: raise CoreGoesBoomError( f"FS location {directory} does not look like a git repository" ) from e # We were not provided with a path to a local repository and # have to get the code from elsewhere. else: for proj in self.projects: try: repo: Repo = Repo( str( self.p_db_raw.as_posix() + urlparse(proj.http_url_to_repo).path ) ) except NoSuchPathError as E: logger.info( "Failed to find repo for " f"{proj.name_with_namespace})" f"locally ({E})" ) continue if filter_func(proj, repo): continue yield (proj, repo)
[docs] def dashboard(self, cli_args: Namespace, /, *_: Any, **__: Any) -> None: """Visualize the state of the platform as a table.""" args_dict: dict[str, Any] = vars(cli_args) Dashboard(self).run(args_dict)
[docs] def graph(self) -> None: """Visualize the state of the platform as a graph."""
[docs] def _construct_check_filter( self, check: Optional[str] = None ) -> Callable[[type[CheckInterface]], bool]: """ returns: Function on checks that returns True IFF the check should be skipped. Default: skip no checks """ if not check or check == "None": return lambda _: False return lambda c: c.name() != check
[docs] def _construct_repo_filter( self, repo_id: Optional[int] = None ) -> Callable[[RESTObject, Repo], bool]: """ returns: Function on repository API instance and local instance that returns True IFF the repository should be skipped. Default: skip no repositories """ if not repo_id: return lambda *_: False return lambda p, _: p.id != repo_id
[docs] def check(self, cli_args: Namespace, /, *_: Any, **__: Any) -> None: """Performs a set of checks on a set of repositories.""" args_dict: dict[str, Any] = dict(vars(cli_args)) # validation of arguments if not checks.validate_args(args_dict): return # transformation of arguments directory, _id = checks.transform_args(args_dict) check_filter: Callable[ [type[CheckInterface]], bool ] = self._construct_check_filter(str(args_dict.get("check"))) repo_filter: Callable[ [RESTObject, Repo], bool ] = self._construct_repo_filter(_id) # run checks and collect the results results: list[dict[str, Any]] = [] for proj, repo in self.iter_projects( filter_func=repo_filter, directory=directory, _id=_id ): for check in checks.iter_checks( proj, repo, self.gl, filter_func=check_filter ): try: tmp: dict[str, Any] = check.run(args_dict) assert checks.results_valid(tmp) results.append(tmp) except CheckGoesBoomError as error: logger.error( f"An error occurred during check {check.name()} for {proj.id}: {error}" ) continue except Exception as error: logger.exception( f"Unexpected error during check {check.name()} for {proj.id}: {error}" ) continue # write results to stdout print(json.dumps(results)) # noqa: T201
[docs] def update(self, *_: Any, **__: Any) -> None: """Updates the local OpenCoDE mirror.""" # TODO?: Handle branches https://stackoverflow.com/questions/ # 67699/how-do-i-clone-all-remote-branches/4754797#4754797""" # pylint: disable=too-complex total: int = len(self.projects) repo: Repo | None = None for n, proj in enumerate(self.projects): if int(proj.id) in set({908}): logging.info( f"Skipped ridiculously large project {proj.name_with_namespace}" ) continue logger.info(f"Processing project {n} of {total}") try: # to find the repo locally repo = Repo( self.p_db_raw.as_posix() + str(urlparse(proj.http_url_to_repo).path) ) except NoSuchPathError as e: logger.info( "Failed to find repo for " f"'{proj.name_with_namespace}'" f" locally ({e})" ) try: # to clone the repo http_url: str = str(proj.http_url_to_repo) clone_project( http_url, self.p_db_raw / Path( str(urlparse(proj.http_url_to_repo).path) ).relative_to("/"), ) repo = Repo( self.p_db_raw.as_posix() + str(urlparse(proj.http_url_to_repo).path) ) # pylint: disable=redefined-outer-name except sp.CalledProcessError as e: # give up on project logger.error( "Failed to clone repo for " f"'{proj.name_with_namespace}'" f" ({e})" ) continue try: # to update, anticipate repos in an incosistent state git = repo.git git.fetch() git.merge("--strategy-option", "theirs", "--no-edit") git.pull("-X", "theirs") except GitCommandError as e: if "not something we can merge" in str(e.stderr): logger.info( "Failed to update repo for " f"'{proj.name_with_namespace}'" " (it is probably empty)" ) else: logger.error( "Failed to update repo for " f"'{proj.name_with_namespace}'" f" ({e})" ) return