diff --git a/config/config.yaml b/config/config.yaml index e895bb7..c4fc44a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,6 +1,6 @@ logging: level: "TRACE" - path: "../../logs/karl.log" + path: "logs/karl.log" app: host: "127.0.0.1" port: 8081 diff --git a/pyproject.toml b/pyproject.toml index fa843df..4738496 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ "fastapi-utils>=0.8.0", "keyring>=25.6.0", "keyring-backend>=0.1.0", + "bubus>=1.5.6", ] [dependency-groups] diff --git a/src/karl/api/v1.py b/src/karl/api/v1.py index 9bbbfbc..3bc8090 100644 --- a/src/karl/api/v1.py +++ b/src/karl/api/v1.py @@ -1,4 +1,7 @@ +import logging + from automapper import mapper, exceptions +from bubus import EventBus from fastapi import APIRouter, Depends from fastapi_utils.cbv import cbv from starlette.responses import JSONResponse, Response @@ -9,7 +12,7 @@ from core.woodpecker import Woodpecker from model.webhook import WoodpeckerEvent router = APIRouter() - +logger = logging.getLogger(__name__) @router.get("/", summary="Main API") async def root(): @@ -19,7 +22,7 @@ async def root(): @cbv(router) class APIv1: woodpecker: Woodpecker = Depends(AutowireSupport.woodpecker) - logger = __import__('logging').getLogger(__name__) + bus: EventBus = Depends(AutowireSupport.bus) def __init__(self): try: # TODO: rejestracja w innym miejscu: klasa jest przeładowywana co żądanie @@ -34,5 +37,5 @@ class APIv1: @router.post("/ci", summary="CI Webhook") async def ci(self, request: Request): - self.woodpecker.on_ci_event(mapper.map(request)) + await self.bus.dispatch(mapper.map(request)) return Response(status_code=201) diff --git a/src/karl/core/events.py b/src/karl/core/events.py new file mode 100644 index 0000000..456968b --- /dev/null +++ b/src/karl/core/events.py @@ -0,0 +1,11 @@ +import logging + +from bubus import EventBus +from injectable import injectable_factory + +logger = logging.getLogger(__name__) + +@injectable_factory(EventBus, singleton=True) +def event_bus_factory() -> EventBus: + logger.info("Creating event bus...") + return EventBus() diff --git a/src/karl/core/injects.py b/src/karl/core/injects.py index 623fa59..e7081a7 100644 --- a/src/karl/core/injects.py +++ b/src/karl/core/injects.py @@ -1,3 +1,4 @@ +from bubus import EventBus from injectable import inject from core.woodpecker import Woodpecker @@ -8,3 +9,7 @@ class AutowireSupport: @staticmethod def woodpecker(): return inject(Woodpecker) + + @staticmethod + def bus(): + return inject(EventBus) diff --git a/src/karl/core/woodpecker.py b/src/karl/core/woodpecker.py index 2b66014..24af84c 100644 --- a/src/karl/core/woodpecker.py +++ b/src/karl/core/woodpecker.py @@ -1,9 +1,11 @@ +import asyncio import logging from collections import deque -from multiprocessing import Process, Lock from pathlib import Path +from threading import RLock, Thread from typing import Annotated +from bubus import EventBus, BaseEvent from injectable import injectable, Autowired, autowired from config import get_settings @@ -14,15 +16,20 @@ from services.mo import Mo logger = logging.getLogger(__name__) -class WoodpeckerRunner(Process): +class RunnerResult(BaseEvent): + success: bool = False + throwable: Exception | None = None + + +class WoodpeckerRunner(Thread): + @autowired def __init__(self, git: GitService, docker: DockerService, mo: Mo, - success_callback=None, error_callback=None): + bus: Annotated[EventBus, Autowired]): super().__init__(daemon=True) self._git = git self._docker = docker self._mo = mo - self._success_callback = success_callback - self._error_callback = error_callback + self._bus = bus self._event: WoodpeckerEvent | None = None self._root = get_settings().git.path @@ -31,21 +38,27 @@ class WoodpeckerRunner(Process): self.start() def run(self): + async def dispatch(r: RunnerResult): + await self._bus.dispatch(r) + + result = RunnerResult() try: service = self.get_service(self._event.files) if service is None: logger.info("No service found.") - return self._success_callback() - service_path = f"{self._root}/compose/{service}/docker-compose.yml" - self._git.checkout(self._event.commit) - for file in self._event.files: - if file.__contains__('.mo.'): - self._mo.process(Path(f"{self._root}{file}").absolute()) - self._docker.reload(Path(service_path).absolute()) - - return self._success_callback() + result.success = True + else: + service_path = f"{self._root}/compose/{service}/docker-compose.yml" + self._git.checkout(self._event.commit) + for file in self._event.files: + if file.__contains__('.mo.'): + self._mo.process(Path(f"{self._root}{file}").absolute()) + self._docker.reload(Path(service_path).absolute()) + result.success = True except Exception as e: - return self._error_callback(e) + result.throwable = e + + asyncio.run(dispatch(result)) def get_service(self, files: list[str]) -> str | None: supported_files = [] @@ -65,42 +78,43 @@ class WoodpeckerRunner(Process): @injectable(singleton=True) class Woodpecker: @autowired - def __init__(self, mo: Annotated[Mo, Autowired]): + def __init__(self, mo: Annotated[Mo, Autowired], + bus: Annotated[EventBus, Autowired]): self._mo = mo + self._bus = bus self._git = GitService() self._docker = DockerService() self._runner: WoodpeckerRunner | None = None self._pending = deque() - self._lock = Lock() + self._lock = RLock() + bus.on(WoodpeckerEvent, self.on_ci_event) + bus.on(RunnerResult, self._on_runner_completed) logger.info("Woodpecker initialized.") - def on_ci_event(self, event: WoodpeckerEvent): - logger.info(f"Received event: {event}") + async def on_ci_event(self, event: WoodpeckerEvent): + logger.debug(f"Received WoodpeckerEvent: {event.event_id}") with self._lock: + logger.debug("Lock acquired [on-ci-event]") if len(self._pending) > 0 or self._runner is not None: self._pending.append(event) - return - self._start_runner(event) + else: + self._start_runner(event) def _start_runner(self, event: WoodpeckerEvent): with self._lock: - self._runner = WoodpeckerRunner(self._git, self._docker, self._mo, - self._on_runner_completed, self._on_runner_error) + logger.debug("Lock acquired [start-runner]") + self._runner = WoodpeckerRunner(self._git, self._docker, self._mo) self._runner.process_event(event) - def _on_runner_completed(self): - logger.info("Runner completed.") - self._runner.join() - with self._lock: - self._runner = None - if len(self._pending) > 0: - event = self._pending.popleft() - self._start_runner(event) - - def _on_runner_error(self, t: Exception): - logger.error(f"Runner error: {t}", exc_info=True) - self._runner.join() + def _on_runner_completed(self, result: RunnerResult): + logger.debug(f"Received RunnerResult: {result.event_id}") + logger.info(f"Runner completed {'successfully' if result.success else 'with error'}.") + if result.throwable is not None: + logger.error(f"Runner error: {result.throwable}", exc_info=True) + self._runner.join(timeout=1) + logger.debug("Runner joined.") with self._lock: + logger.debug("Lock acquired [on-runner-completed]") self._runner = None if len(self._pending) > 0: event = self._pending.popleft() diff --git a/src/karl/model/webhook.py b/src/karl/model/webhook.py index a2ef7dc..e97c073 100644 --- a/src/karl/model/webhook.py +++ b/src/karl/model/webhook.py @@ -1,9 +1,9 @@ -from dataclasses import dataclass from typing import List +from bubus import BaseEvent -@dataclass -class WoodpeckerEvent: + +class WoodpeckerEvent(BaseEvent): _id: str commit: str message: str