import logging from collections import deque from multiprocessing import Process, Lock from pathlib import Path from typing import Annotated from injectable import injectable, Autowired, autowired from app.config import get_settings from app.model.webhook import WoodpeckerEvent from app.services import Passwords, GitService, DockerService logger = logging.getLogger(__name__) class WoodpeckerRunner(Process): def __init__(self, git: GitService, docker: DockerService, passwords: Passwords, success_callback=None, error_callback=None): super().__init__(daemon=True) self._git = git self._docker = docker self._passwords = passwords self._success_callback = success_callback self._error_callback = error_callback self._event: WoodpeckerEvent | None = None def process_event(self, event: WoodpeckerEvent): self._event = event self.start() def run(self): try: service = self.get_service(self._event.files) if service is None: logger.info("No service found.") return self._success_callback() service_path = f"{get_settings().git.path}/compose/{service}/docker-compose.yml" self._git.checkout(self._event.commit) for file in self._event.files: if file.__contains__('.mo.'): pass self._docker.reload(Path(service_path)) return self._success_callback() except Exception as e: return self._error_callback(e) def get_service(self, files: list[str]) -> str | None: supported_files = [] for f in files: f_parts = f.split("/") if f_parts[0] in ["compose", "files"]: supported_files.append(f[1]) match len(set(supported_files)): case 0: return None case 1: return supported_files[0] case _: raise Exception("Multiple services are not supported.") @injectable(singleton=True) class Woodpecker: @autowired def __init__(self, passwords: Annotated[Passwords, Autowired]): self._passwords = passwords self._git = GitService() self._docker = DockerService() self._runner: WoodpeckerRunner | None = None self._pending = deque() self._lock = Lock() logger.info("Woodpecker initialized.") def on_ci_event(self, event: WoodpeckerEvent): logger.info(f"Received event: {event}") with self._lock: if len(self._pending) > 0 or self._runner is not None: self._pending.append(event) return self._start_runner(event) def _start_runner(self, event: WoodpeckerEvent): with self._lock: self._runner = WoodpeckerRunner(self._git, self._docker, self._on_runner_completed) self._runner.process_event(event) def _on_runner_completed(self): logger.info("Runner completed.") self._runner.join() with self._lock: self._runner = None if len(self._pending) > 0: event = self._pending.popleft() self._start_runner(event) def _on_runner_error(self, t: Exception): logger.error(f"Runner error: {t}", exc_info=True) self._runner.join() with self._lock: self._runner = None if len(self._pending) > 0: event = self._pending.popleft() self._start_runner(event)