diff --git a/src/karl/core/woodpecker.py b/src/karl/core/woodpecker.py index 19ec9df..24af84c 100644 --- a/src/karl/core/woodpecker.py +++ b/src/karl/core/woodpecker.py @@ -1,10 +1,11 @@ +import asyncio import logging from collections import deque from pathlib import Path from threading import RLock, Thread from typing import Annotated -from bubus import EventBus +from bubus import EventBus, BaseEvent from injectable import injectable, Autowired, autowired from config import get_settings @@ -15,15 +16,20 @@ from services.mo import Mo logger = logging.getLogger(__name__) +class RunnerResult(BaseEvent): + success: bool = False + throwable: Exception | None = None + + class WoodpeckerRunner(Thread): + @autowired def __init__(self, git: GitService, docker: DockerService, mo: Mo, - success_callback=None, error_callback=None): + bus: Annotated[EventBus, Autowired]): super().__init__(daemon=True) self._git = git self._docker = docker self._mo = mo - self._success_callback = success_callback - self._error_callback = error_callback + self._bus = bus self._event: WoodpeckerEvent | None = None self._root = get_settings().git.path @@ -32,21 +38,27 @@ class WoodpeckerRunner(Thread): self.start() def run(self): + async def dispatch(r: RunnerResult): + await self._bus.dispatch(r) + + result = RunnerResult() try: service = self.get_service(self._event.files) if service is None: logger.info("No service found.") - return self._success_callback() - service_path = f"{self._root}/compose/{service}/docker-compose.yml" - self._git.checkout(self._event.commit) - for file in self._event.files: - if file.__contains__('.mo.'): - self._mo.process(Path(f"{self._root}{file}").absolute()) - self._docker.reload(Path(service_path).absolute()) - - return self._success_callback() + result.success = True + else: + service_path = f"{self._root}/compose/{service}/docker-compose.yml" + self._git.checkout(self._event.commit) + for file in self._event.files: + if file.__contains__('.mo.'): + self._mo.process(Path(f"{self._root}{file}").absolute()) + self._docker.reload(Path(service_path).absolute()) + result.success = True except Exception as e: - return self._error_callback(e) + result.throwable = e + + asyncio.run(dispatch(result)) def get_service(self, files: list[str]) -> str | None: supported_files = [] @@ -76,40 +88,34 @@ class Woodpecker: self._pending = deque() self._lock = RLock() bus.on(WoodpeckerEvent, self.on_ci_event) + bus.on(RunnerResult, self._on_runner_completed) logger.info("Woodpecker initialized.") - def on_ci_event(self, event: WoodpeckerEvent): - logger.info(f"Received event: {event.event_id}") + async def on_ci_event(self, event: WoodpeckerEvent): + logger.debug(f"Received WoodpeckerEvent: {event.event_id}") with self._lock: logger.debug("Lock acquired [on-ci-event]") if len(self._pending) > 0 or self._runner is not None: self._pending.append(event) - return - self._start_runner(event) + else: + self._start_runner(event) def _start_runner(self, event: WoodpeckerEvent): with self._lock: logger.debug("Lock acquired [start-runner]") - self._runner = WoodpeckerRunner(self._git, self._docker, self._mo, - self._on_runner_completed, self._on_runner_error) + self._runner = WoodpeckerRunner(self._git, self._docker, self._mo) self._runner.process_event(event) - def _on_runner_completed(self): - logger.info("Runner completed.") - self._runner.join() + def _on_runner_completed(self, result: RunnerResult): + logger.debug(f"Received RunnerResult: {result.event_id}") + logger.info(f"Runner completed {'successfully' if result.success else 'with error'}.") + if result.throwable is not None: + logger.error(f"Runner error: {result.throwable}", exc_info=True) + self._runner.join(timeout=1) + logger.debug("Runner joined.") with self._lock: logger.debug("Lock acquired [on-runner-completed]") self._runner = None if len(self._pending) > 0: event = self._pending.popleft() self._start_runner(event) - - def _on_runner_error(self, t: Exception): - logger.error(f"Runner error: {t}", exc_info=True) - self._runner.join() - with self._lock: - logger.debug("Lock acquired [on-runner-error]") - self._runner = None - if len(self._pending) > 0: - event = self._pending.popleft() - self._start_runner(event)