import logging from collections import deque from multiprocessing import Process, Lock from pathlib import Path from typing import Annotated from injectable import injectable, Autowired, autowired from app.config import get_settings from app.model.webhook import WoodpeckerEvent from app.services import GitService, DockerService from app.services.mo import Mo logger = logging.getLogger(__name__) class WoodpeckerRunner(Process): def __init__(self, git: GitService, docker: DockerService, mo: Mo, success_callback=None, error_callback=None): super().__init__(daemon=True) self._git = git self._docker = docker self._mo = mo self._success_callback = success_callback self._error_callback = error_callback self._event: WoodpeckerEvent | None = None self._root = get_settings().git.path def process_event(self, event: WoodpeckerEvent): self._event = event self.start() def run(self): try: service = self.get_service(self._event.files) if service is None: logger.info("No service found.") return self._success_callback() service_path = f"{self._root}/compose/{service}/docker-compose.yml" self._git.checkout(self._event.commit) for file in self._event.files: if file.__contains__('.mo.'): self._mo.process(Path(f"{self._root}{file}").absolute()) self._docker.reload(Path(service_path).absolute()) return self._success_callback() except Exception as e: return self._error_callback(e) def get_service(self, files: list[str]) -> str | None: supported_files = [] for f in files: f_parts = f.split("/") if f_parts[0] in ["compose", "files"]: supported_files.append(f[1]) match len(set(supported_files)): case 0: return None case 1: return supported_files[0] case _: raise Exception("Multiple services are not supported.") @injectable(singleton=True) class Woodpecker: @autowired def __init__(self, mo: Annotated[Mo, Autowired]): self._mo = mo self._git = GitService() self._docker = DockerService() self._runner: WoodpeckerRunner | None = None self._pending = deque() self._lock = Lock() logger.info("Woodpecker initialized.") def on_ci_event(self, event: WoodpeckerEvent): logger.info(f"Received event: {event}") with self._lock: if len(self._pending) > 0 or self._runner is not None: self._pending.append(event) return self._start_runner(event) def _start_runner(self, event: WoodpeckerEvent): with self._lock: self._runner = WoodpeckerRunner(self._git, self._docker, self._mo, self._on_runner_completed, self._on_runner_error) self._runner.process_event(event) def _on_runner_completed(self): logger.info("Runner completed.") self._runner.join() with self._lock: self._runner = None if len(self._pending) > 0: event = self._pending.popleft() self._start_runner(event) def _on_runner_error(self, t: Exception): logger.error(f"Runner error: {t}", exc_info=True) self._runner.join() with self._lock: self._runner = None if len(self._pending) > 0: event = self._pending.popleft() self._start_runner(event)