"""Module which provides the methods that correspond to the subcommands
of occmd"""
import json
import logging
import os
import subprocess as sp
from argparse import Namespace
from collections.abc import Callable, Generator
from pathlib import Path
from time import time
from typing import Any, Optional, cast
from urllib.parse import urlparse
from git.exc import GitCommandError, InvalidGitRepositoryError, NoSuchPathError
from git.repo import Repo
from gitlab import Gitlab
from gitlab.base import RESTObject, RESTObjectList
from gitlab.exceptions import GitlabGetError
from gitlab.v4.objects import Project
from src import checks
from src.config import context
from src.dashboard import Dashboard # type: ignore[attr-defined]
from src.exceptions import CheckGoesBoomError, CoreGoesBoomError
from src.interfaces import CheckInterface
from src.opencode_git import clone_project
logger: logging.Logger = logging.getLogger(__name__)
[docs]
class OpenCode:
"""Class which provides the methods that correspond to the
subcommands of occmd"""
url_opencode: str = context.settings["oc_gl_url"]
apikey: Optional[str] = os.environ.get("OC_GL_APIKEY", default=None)
p_db_raw: Path = context.settings["local_repo_db_path_db_raw"]
[docs]
def __init__(self):
self.gl = Gitlab(self.url_opencode, private_token=self.apikey)
self._projects = None
self._users = None
@property
def users(self) -> list[RESTObject] | RESTObjectList:
"""returns:
All users registered on the OpenCoDE platform
Note: might not work if they change the API config"""
if self._users is None:
self._users = self.gl.users.list(get_all=True)
return self._users
@property
def projects(self) -> list[Project]:
"""returns:
All projects listed on the OpenCoDE platform"""
if self._projects is None:
self._projects = cast(
list[Project], self.gl.projects.list(get_all=True)
)
return self._projects
[docs]
def get_project_by_id(self, _id: int) -> Project:
"""Get meta information about a project from its id"""
return self.gl.projects.get(_id)
[docs]
def get_repo_for_proj(self, proj: Project) -> Repo:
"""
Get the local repository for a remote project from the local OpenCoDE
dump.
"""
try:
if self.p_db_raw.as_posix() == ".":
logger.info("No local OpenCoDE dump configured")
raise NoSuchPathError
# note: proj.http_url_to_repo.path is attacker controlled, path
# traversal should not be an issue, due to Gitlab naming rules
# it will always expect a folder which is 1+ deep and ends in .git
repo: Repo = Repo(
self.p_db_raw.as_posix() + urlparse(proj.http_url_to_repo).path
)
except NoSuchPathError as E:
logger.info(
f"Failed to find repo for {proj.name_with_namespace}) locally ({E})"
)
raise E
return repo
[docs]
def clone_repo_for_proj(self, proj: Project) -> Repo:
"""
Get the local repository for a remote project via cloning to temporary
directory
"""
local_dir: Path = Path(f"/tmp/_occmd_{time()}_{proj.name}")
clone_project(proj.http_url_to_repo, local_dir)
return Repo(local_dir)
# pylint: disable=too-complex
[docs]
def iter_projects(
self,
filter_func: Callable[[Project, Repo], bool] = lambda *_: False,
_id: Optional[int] = None,
directory: Optional[Path] = None,
) -> Generator[tuple[Project, Repo], None, None]:
"""
:param filter_func: skip project if this functions maps it to True
:param _id: optional, Gitlab id of the project specified via 'directory'
parameter
:param directory: optional, local file system location of git root of
project specified via 'id' parameter
:returns: Api objects and local repo objects for all projects on
OpenCoDE. Optionally: Only generates a single tuple for the
project specified by 'id' and 'directory' parameters.
"""
# The case where we were called with a local repository and get
# the corresponding API object via the provided project id.
if _id and directory:
try:
yield (self.get_project_by_id(_id), Repo(directory))
except GitlabGetError as e:
raise CoreGoesBoomError(
f"Unable to find id {_id} on remote Gitlab"
) from e
except InvalidGitRepositoryError as e:
raise CoreGoesBoomError(
f"FS location {directory} does not look like a git repository"
) from e
# We were not provided with a path to a local repository and
# have to get the code from elsewhere.
else:
for proj in self.projects:
try:
repo: Repo = Repo(
str(
self.p_db_raw.as_posix()
+ urlparse(proj.http_url_to_repo).path
)
)
except NoSuchPathError as E:
logger.info(
"Failed to find repo for "
f"{proj.name_with_namespace})"
f"locally ({E})"
)
continue
if filter_func(proj, repo):
continue
yield (proj, repo)
[docs]
def dashboard(self, cli_args: Namespace, /, *_: Any, **__: Any) -> None:
"""Visualize the state of the platform as a table."""
args_dict: dict[str, Any] = vars(cli_args)
Dashboard(self).run(args_dict)
[docs]
def graph(self) -> None:
"""Visualize the state of the platform as a graph."""
[docs]
def _construct_check_filter(
self, check: Optional[str] = None
) -> Callable[[type[CheckInterface]], bool]:
"""
returns:
Function on checks that returns True IFF the check should
be skipped. Default: skip no checks
"""
if not check or check == "None":
return lambda _: False
return lambda c: c.name() != check
[docs]
def _construct_repo_filter(
self, repo_id: Optional[int] = None
) -> Callable[[RESTObject, Repo], bool]:
"""
returns:
Function on repository API instance and local instance
that returns True IFF the repository should
be skipped. Default: skip no repositories
"""
if not repo_id:
return lambda *_: False
return lambda p, _: p.id != repo_id
[docs]
def check(self, cli_args: Namespace, /, *_: Any, **__: Any) -> None:
"""Performs a set of checks on a set of repositories."""
args_dict: dict[str, Any] = dict(vars(cli_args))
# validation of arguments
if not checks.validate_args(args_dict):
return
# transformation of arguments
directory, _id = checks.transform_args(args_dict)
check_filter: Callable[
[type[CheckInterface]], bool
] = self._construct_check_filter(str(args_dict.get("check")))
repo_filter: Callable[
[RESTObject, Repo], bool
] = self._construct_repo_filter(_id)
# run checks and collect the results
results: list[dict[str, Any]] = []
for proj, repo in self.iter_projects(
filter_func=repo_filter, directory=directory, _id=_id
):
for check in checks.iter_checks(
proj, repo, self.gl, filter_func=check_filter
):
try:
tmp: dict[str, Any] = check.run(args_dict)
assert checks.results_valid(tmp)
results.append(tmp)
except CheckGoesBoomError as error:
logger.error(
f"An error occurred during check {check.name()} for {proj.id}: {error}"
)
continue
except Exception as error:
logger.exception(
f"Unexpected error during check {check.name()} for {proj.id}: {error}"
)
continue
# write results to stdout
print(json.dumps(results)) # noqa: T201
[docs]
def update(self, *_: Any, **__: Any) -> None:
"""Updates the local OpenCoDE mirror."""
# TODO?: Handle branches https://stackoverflow.com/questions/
# 67699/how-do-i-clone-all-remote-branches/4754797#4754797"""
# pylint: disable=too-complex
total: int = len(self.projects)
repo: Repo | None = None
for n, proj in enumerate(self.projects):
if int(proj.id) in set({908}):
logging.info(
f"Skipped ridiculously large project {proj.name_with_namespace}"
)
continue
logger.info(f"Processing project {n} of {total}")
try: # to find the repo locally
repo = Repo(
self.p_db_raw.as_posix()
+ str(urlparse(proj.http_url_to_repo).path)
)
except NoSuchPathError as e:
logger.info(
"Failed to find repo for "
f"'{proj.name_with_namespace}'"
f" locally ({e})"
)
try: # to clone the repo
http_url: str = str(proj.http_url_to_repo)
clone_project(
http_url,
self.p_db_raw
/ Path(
str(urlparse(proj.http_url_to_repo).path)
).relative_to("/"),
)
repo = Repo(
self.p_db_raw.as_posix()
+ str(urlparse(proj.http_url_to_repo).path)
)
# pylint: disable=redefined-outer-name
except sp.CalledProcessError as e: # give up on project
logger.error(
"Failed to clone repo for "
f"'{proj.name_with_namespace}'"
f" ({e})"
)
continue
try: # to update, anticipate repos in an incosistent state
git = repo.git
git.fetch()
git.merge("--strategy-option", "theirs", "--no-edit")
git.pull("-X", "theirs")
except GitCommandError as e:
if "not something we can merge" in str(e.stderr):
logger.info(
"Failed to update repo for "
f"'{proj.name_with_namespace}'"
" (it is probably empty)"
)
else:
logger.error(
"Failed to update repo for "
f"'{proj.name_with_namespace}'"
f" ({e})"
)
return