import logging from collections import deque from multiprocessing import Process, Lock from typing import Annotated from injectable import injectable, Autowired, autowired from app.model.webhook import WoodpeckerEvent from app.services import Passwords, GitService, DockerService logger = logging.getLogger(__name__) class WoodpeckerRunner(Process): def __init__(self, event: WoodpeckerEvent): super().__init__() self._event = event def run(self): super().run() """ event: WebhookEvent = task.payload # TODO: persist event data commit_hash = self._git.get_new_commit_hash() if commit_hash != event.commit: logger.warning(f"Commit hash mismatch: {commit_hash} != {event.commit}") return Result(task.id, False, "Commit hash mismatch") # TODO: persist commit data service = self._get_service(event.files) """ @injectable(singleton=True) class Woodpecker: @autowired def __init__(self, passwords: Annotated[Passwords, Autowired]): self._passwords = passwords self._git = GitService() self._docker = DockerService() self._runner: WoodpeckerRunner | None = None self._pending = deque() self._lock = Lock() logger.info("Woodpecker initialized.") def on_ci_event(self, event: WoodpeckerEvent): logger.info(f"Received event: {event}") with self._lock: if len(self._pending) > 0 or self._runner is not None: self._pending.append(event) return self._start_runner(event) def _start_runner(self, event: WoodpeckerEvent): pass def _on_runner_completed(self): logger.info("Runner completed.") self._runner.join() with self._lock: self._runner = None if len(self._pending) > 0: event = self._pending.popleft() self._start_runner(event)