import logging from collections import deque from multiprocessing import Process, Lock from typing import Annotated from injectable import injectable, Autowired, autowired from app.model.webhook import WoodpeckerEvent from app.services import Passwords, GitService, DockerService logger = logging.getLogger(__name__) class WoodpeckerRunner(Process): def __init__(self, git: GitService, docker: DockerService, success_callback=None, error_callback=None): super().__init__(daemon=True) self._git = git self._docker = docker self._success_callback = success_callback self._error_callback = error_callback self._event: WoodpeckerEvent | None = None def process_event(self, event: WoodpeckerEvent): self._event = event self.start() def run(self): try: service = self.get_service(self._event.files) if service is None: logger.info("No service found.") return self._success_callback() self._git.checkout(self._event.commit) """ TODO: check for *.mo.* files subs mo from pass docker compose up -d -f service/docker-compose.yml """ return self._success_callback() except Exception as e: return self._error_callback(e) def get_service(self, files: list[str]) -> str | None: supported_files = [] for f in files: f_parts = f.split("/") if f_parts[0] in ["compose", "files"]: supported_files.append(f[1]) match len(set(supported_files)): case 0: return None case 1: return supported_files[0] case _: raise Exception("Multiple services are not supported.") @injectable(singleton=True) class Woodpecker: @autowired def __init__(self, passwords: Annotated[Passwords, Autowired]): self._passwords = passwords self._git = GitService() self._docker = DockerService() self._runner: WoodpeckerRunner | None = None self._pending = deque() self._lock = Lock() logger.info("Woodpecker initialized.") def on_ci_event(self, event: WoodpeckerEvent): logger.info(f"Received event: {event}") with self._lock: if len(self._pending) > 0 or self._runner is not None: self._pending.append(event) return self._start_runner(event) def _start_runner(self, event: WoodpeckerEvent): with self._lock: self._runner = WoodpeckerRunner(self._git, self._docker, self._on_runner_completed) self._runner.process_event(event) def _on_runner_completed(self): logger.info("Runner completed.") self._runner.join() with self._lock: self._runner = None if len(self._pending) > 0: event = self._pending.popleft() self._start_runner(event) def _on_runner_error(self, t: Exception): logger.error(f"Runner error: {t}", exc_info=True) self._runner.join() with self._lock: self._runner = None if len(self._pending) > 0: event = self._pending.popleft() self._start_runner(event)