"""Implementation of the Secrets check, which attempts to find leaked
secrets in a git repository. It is really just running a bunch of
open source tools and collecting their output."""
import logging
import re
from collections.abc import Generator, Iterable, Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Optional
from detect_secrets.core import baseline as ds_baseline
from detect_secrets.core.potential_secret import PotentialSecret
from detect_secrets.core.secrets_collection import SecretsCollection
from detect_secrets.settings import Settings, default_settings
from src.config import context
from src.interfaces import CheckInterface
from .interfaces_secrets import SecretInterface, SecretsToolInterface
logger = logging.getLogger(__name__)
# Tool: Yelp / detect-secrets
[docs]
class DetectSecretsSecret(SecretInterface):
[docs]
def __init__(self, potential_secret: PotentialSecret):
self.secret: PotentialSecret = potential_secret
[docs]
def summarize(self) -> dict:
return self.secret.json()
[docs]
class DetectSecrets(SecretsToolInterface):
[docs]
def __init__(self) -> None:
self.secrets: SecretsCollection = SecretsCollection()
[docs]
def check_file(self, f: Path) -> None:
with _custom_settings():
self.secrets.scan_file(f.as_posix())
[docs]
def _get_baseline_dir(self, project_id: int) -> Path:
return context.settings["Secrets_baselines_dir"] / f"{project_id}"
[docs]
def _get_baseline_file(self, project_id: int) -> Path:
return (
context.settings["Secrets_baselines_dir"]
/ f"{project_id}"
/ f"{self.name()}.baseline"
)
[docs]
def maybe_load_baseline(
self, project_id: int
) -> Optional[SecretsCollection]:
baseline_file: Path = self._get_baseline_file(project_id)
return (
ds_baseline.load(
ds_baseline.load_from_file(baseline_file.as_posix()),
baseline_file.as_posix(),
)
if baseline_file.exists()
else None
)
[docs]
def create_or_overwrite_baseline(self, project_id: int) -> None:
baseline_dir: Path = self._get_baseline_dir(project_id)
baseline_file: Path = self._get_baseline_file(project_id)
baseline_dir.mkdir(exist_ok=True)
if baseline_file.exists():
logger.info(f"Updating existing baseline: {baseline_file}")
else:
logger.info(f"Creating baseline: {baseline_file}")
ds_baseline.save_to_file(self.secrets, baseline_file.as_posix())
[docs]
def update_baseline(self, project_id: int) -> None:
baseline: Optional[SecretsCollection] = self.maybe_load_baseline(
project_id
)
if not baseline:
logger.info(
f"No baseline for {self.name()} and project "
f"{project_id} available."
)
else:
logger.info(
f"Loaded baseline {baseline.json()} of {self.name()} for "
f"{project_id} from disk."
)
# intersect
self.secrets.trim(baseline)
# copy meta information
self.secrets.merge(baseline)
self.create_or_overwrite_baseline(project_id)
[docs]
def diff_vs_baseline(self, project_id: int) -> Iterable[SecretInterface]:
baseline: Optional[SecretsCollection] = self.maybe_load_baseline(
project_id
)
diff: SecretsCollection = (
self.secrets - baseline if baseline else self.secrets
)
for _, secret in diff:
yield DetectSecretsSecret(secret)
[docs]
def delete_baseline(self, project_id: int) -> None:
baseline_file: Path = self._get_baseline_file(project_id)
baseline_file.unlink(missing_ok=True)
[docs]
def check_files(self, files: Iterable[Path]) -> None:
with _custom_settings():
self.secrets.scan_files(*(f.as_posix() for f in files))
@property
def detected_secrets(
self,
) -> Generator[SecretInterface, None, None]:
for _, potential_secret in self.secrets:
yield DetectSecretsSecret(potential_secret)
# Check
[docs]
class Secrets(CheckInterface):
"""Class which represents a check that runs a bunch of secret
detection tools against a given project and spits out a 'score'."""
exclude: re.Pattern = re.compile("(^.git$|test)")
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.SECRETS_TOOLS: list[type[SecretsToolInterface]] = [DetectSecrets]
[docs]
def _detect_secrets(self) -> dict[str, Iterable[SecretInterface]]:
"""Generates the set of results that are not in the baseline,
i.e, 'R \\ (R cup B)', for each tool. Returns the union 'V'
of these sets. Also updates or creates baselines along the
way."""
detected_secrets: dict[str, Iterable[SecretInterface]] = {}
for tool in self.SECRETS_TOOLS:
tool_instance: SecretsToolInterface = tool()
files: Iterable[Path] = self._gen_file_list()
tool_instance.check_files(files)
logger.debug(
f"{tool_instance.name()} detected: "
f"""{[
s.summarize() for s in tool_instance.detected_secrets
]}"""
)
if self.redo_baselines:
tool_instance.delete_baseline(self.proj.id)
diff: list[SecretInterface] = list(
tool_instance.diff_vs_baseline(self.proj.id)
)
if diff:
logger.info(
f"{tool_instance.name()} detected secrets that were "
"not part of the baseline: "
f"{[s.summarize() for s in diff]}"
)
else:
logger.info(f"{tool_instance.name()} detected no new secrets.")
tool_instance.update_baseline(self.proj.id)
detected_secrets |= {tool_instance.name(): diff}
return detected_secrets
[docs]
def _calc_score(
self, detected_secrets: dict[str, Iterable[SecretInterface]]
) -> float:
num_secrets: int = 0
for secrets in detected_secrets.values():
num_secrets += len(list(secrets))
return 1.0 if num_secrets == 0 else 1 / num_secrets
[docs]
def _process_args(self, args_dict: Optional[dict[str, Any]]) -> None:
assert args_dict is not None
if args_dict.get("baseline"):
logger.info("Recreating baselines for all tools.")
self.redo_baselines = True
else:
self.redo_baselines = False
[docs]
def run(self, args_dict: Optional[dict[str, Any]] = None) -> dict[str, Any]:
ret: dict[str, Any] = super().run(args_dict)
self._process_args(args_dict)
detected_secrets: dict[
str, Iterable[SecretInterface]
] = self._detect_secrets()
results: dict[str, Any] = {
"tool_secrets": [
{
"name": tool,
"secrets": [s.summarize() for s in secrets],
}
for tool, secrets in detected_secrets.items()
]
}
assert self.results_valid(results)
return ret | {
"score": self._calc_score(detected_secrets),
"results": results,
}
[docs]
@contextmanager
def _custom_settings() -> Iterator[Settings]:
with default_settings() as settings:
settings.disable_filters(
"detect_secrets.filters.heuristic.is_potential_uuid",
)
yield settings