From c937893fbdd0a3a07828f8fe8502099cfb71cc8f Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Wed, 10 Dec 2025 21:47:56 +0100 Subject: [PATCH 1/4] fix: Locks verbosity --- config/config.yaml | 2 +- src/karl/core/woodpecker.py | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index e895bb7..c4fc44a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -1,6 +1,6 @@ logging: level: "TRACE" - path: "../../logs/karl.log" + path: "logs/karl.log" app: host: "127.0.0.1" port: 8081 diff --git a/src/karl/core/woodpecker.py b/src/karl/core/woodpecker.py index 2b66014..6c0a360 100644 --- a/src/karl/core/woodpecker.py +++ b/src/karl/core/woodpecker.py @@ -1,7 +1,8 @@ import logging from collections import deque -from multiprocessing import Process, Lock +from multiprocessing import Process from pathlib import Path +from threading import RLock from typing import Annotated from injectable import injectable, Autowired, autowired @@ -71,12 +72,13 @@ class Woodpecker: self._docker = DockerService() self._runner: WoodpeckerRunner | None = None self._pending = deque() - self._lock = Lock() + self._lock = RLock() logger.info("Woodpecker initialized.") def on_ci_event(self, event: WoodpeckerEvent): logger.info(f"Received event: {event}") with self._lock: + logger.debug("Lock acquired [on-ci-event]") if len(self._pending) > 0 or self._runner is not None: self._pending.append(event) return @@ -84,6 +86,7 @@ class Woodpecker: def _start_runner(self, event: WoodpeckerEvent): with self._lock: + logger.debug("Lock acquired [start-runner]") self._runner = WoodpeckerRunner(self._git, self._docker, self._mo, self._on_runner_completed, self._on_runner_error) self._runner.process_event(event) @@ -92,6 +95,7 @@ class Woodpecker: logger.info("Runner completed.") self._runner.join() with self._lock: + logger.debug("Lock acquired [on-runner-completed]") self._runner = None if len(self._pending) > 0: event = self._pending.popleft() @@ -101,6 +105,7 @@ class Woodpecker: logger.error(f"Runner error: {t}", exc_info=True) self._runner.join() with self._lock: + logger.debug("Lock acquired [on-runner-error]") self._runner = None if len(self._pending) > 0: event = self._pending.popleft() From e1a03e555efa06d604885f1c4f568b4f67b93c28 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Wed, 10 Dec 2025 22:55:25 +0100 Subject: [PATCH 2/4] fix: Process -> Thread --- src/karl/core/woodpecker.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/karl/core/woodpecker.py b/src/karl/core/woodpecker.py index 6c0a360..2e83c07 100644 --- a/src/karl/core/woodpecker.py +++ b/src/karl/core/woodpecker.py @@ -1,8 +1,7 @@ import logging from collections import deque -from multiprocessing import Process from pathlib import Path -from threading import RLock +from threading import RLock, Thread from typing import Annotated from injectable import injectable, Autowired, autowired @@ -15,7 +14,7 @@ from services.mo import Mo logger = logging.getLogger(__name__) -class WoodpeckerRunner(Process): +class WoodpeckerRunner(Thread): def __init__(self, git: GitService, docker: DockerService, mo: Mo, success_callback=None, error_callback=None): super().__init__(daemon=True) From 7808a663427f04653aaf47e030bb6b7eb87d6217 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 11 Dec 2025 22:55:17 +0100 Subject: [PATCH 3/4] feat: new event bus --- pyproject.toml | 1 + src/karl/api/v1.py | 9 ++++++--- src/karl/core/events.py | 11 +++++++++++ src/karl/core/injects.py | 5 +++++ src/karl/core/woodpecker.py | 8 ++++++-- src/karl/model/webhook.py | 6 +++--- 6 files changed, 32 insertions(+), 8 deletions(-) create mode 100644 src/karl/core/events.py diff --git a/pyproject.toml b/pyproject.toml index fa843df..4738496 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ "fastapi-utils>=0.8.0", "keyring>=25.6.0", "keyring-backend>=0.1.0", + "bubus>=1.5.6", ] [dependency-groups] diff --git a/src/karl/api/v1.py b/src/karl/api/v1.py index 9bbbfbc..3bc8090 100644 --- a/src/karl/api/v1.py +++ b/src/karl/api/v1.py @@ -1,4 +1,7 @@ +import logging + from automapper import mapper, exceptions +from bubus import EventBus from fastapi import APIRouter, Depends from fastapi_utils.cbv import cbv from starlette.responses import JSONResponse, Response @@ -9,7 +12,7 @@ from core.woodpecker import Woodpecker from model.webhook import WoodpeckerEvent router = APIRouter() - +logger = logging.getLogger(__name__) @router.get("/", summary="Main API") async def root(): @@ -19,7 +22,7 @@ async def root(): @cbv(router) class APIv1: woodpecker: Woodpecker = Depends(AutowireSupport.woodpecker) - logger = __import__('logging').getLogger(__name__) + bus: EventBus = Depends(AutowireSupport.bus) def __init__(self): try: # TODO: rejestracja w innym miejscu: klasa jest przeładowywana co żądanie @@ -34,5 +37,5 @@ class APIv1: @router.post("/ci", summary="CI Webhook") async def ci(self, request: Request): - self.woodpecker.on_ci_event(mapper.map(request)) + await self.bus.dispatch(mapper.map(request)) return Response(status_code=201) diff --git a/src/karl/core/events.py b/src/karl/core/events.py new file mode 100644 index 0000000..456968b --- /dev/null +++ b/src/karl/core/events.py @@ -0,0 +1,11 @@ +import logging + +from bubus import EventBus +from injectable import injectable_factory + +logger = logging.getLogger(__name__) + +@injectable_factory(EventBus, singleton=True) +def event_bus_factory() -> EventBus: + logger.info("Creating event bus...") + return EventBus() diff --git a/src/karl/core/injects.py b/src/karl/core/injects.py index 623fa59..e7081a7 100644 --- a/src/karl/core/injects.py +++ b/src/karl/core/injects.py @@ -1,3 +1,4 @@ +from bubus import EventBus from injectable import inject from core.woodpecker import Woodpecker @@ -8,3 +9,7 @@ class AutowireSupport: @staticmethod def woodpecker(): return inject(Woodpecker) + + @staticmethod + def bus(): + return inject(EventBus) diff --git a/src/karl/core/woodpecker.py b/src/karl/core/woodpecker.py index 2e83c07..19ec9df 100644 --- a/src/karl/core/woodpecker.py +++ b/src/karl/core/woodpecker.py @@ -4,6 +4,7 @@ from pathlib import Path from threading import RLock, Thread from typing import Annotated +from bubus import EventBus from injectable import injectable, Autowired, autowired from config import get_settings @@ -65,17 +66,20 @@ class WoodpeckerRunner(Thread): @injectable(singleton=True) class Woodpecker: @autowired - def __init__(self, mo: Annotated[Mo, Autowired]): + def __init__(self, mo: Annotated[Mo, Autowired], + bus: Annotated[EventBus, Autowired]): self._mo = mo + self._bus = bus self._git = GitService() self._docker = DockerService() self._runner: WoodpeckerRunner | None = None self._pending = deque() self._lock = RLock() + bus.on(WoodpeckerEvent, self.on_ci_event) logger.info("Woodpecker initialized.") def on_ci_event(self, event: WoodpeckerEvent): - logger.info(f"Received event: {event}") + logger.info(f"Received event: {event.event_id}") with self._lock: logger.debug("Lock acquired [on-ci-event]") if len(self._pending) > 0 or self._runner is not None: diff --git a/src/karl/model/webhook.py b/src/karl/model/webhook.py index a2ef7dc..e97c073 100644 --- a/src/karl/model/webhook.py +++ b/src/karl/model/webhook.py @@ -1,9 +1,9 @@ -from dataclasses import dataclass from typing import List +from bubus import BaseEvent -@dataclass -class WoodpeckerEvent: + +class WoodpeckerEvent(BaseEvent): _id: str commit: str message: str From 71de3c76c6c8733026424950fb6fe0cee79d5e6e Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Fri, 12 Dec 2025 21:24:53 +0100 Subject: [PATCH 4/4] feat: WoodpeckerRunner events --- src/karl/core/woodpecker.py | 72 ++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/src/karl/core/woodpecker.py b/src/karl/core/woodpecker.py index 19ec9df..24af84c 100644 --- a/src/karl/core/woodpecker.py +++ b/src/karl/core/woodpecker.py @@ -1,10 +1,11 @@ +import asyncio import logging from collections import deque from pathlib import Path from threading import RLock, Thread from typing import Annotated -from bubus import EventBus +from bubus import EventBus, BaseEvent from injectable import injectable, Autowired, autowired from config import get_settings @@ -15,15 +16,20 @@ from services.mo import Mo logger = logging.getLogger(__name__) +class RunnerResult(BaseEvent): + success: bool = False + throwable: Exception | None = None + + class WoodpeckerRunner(Thread): + @autowired def __init__(self, git: GitService, docker: DockerService, mo: Mo, - success_callback=None, error_callback=None): + bus: Annotated[EventBus, Autowired]): super().__init__(daemon=True) self._git = git self._docker = docker self._mo = mo - self._success_callback = success_callback - self._error_callback = error_callback + self._bus = bus self._event: WoodpeckerEvent | None = None self._root = get_settings().git.path @@ -32,21 +38,27 @@ class WoodpeckerRunner(Thread): self.start() def run(self): + async def dispatch(r: RunnerResult): + await self._bus.dispatch(r) + + result = RunnerResult() try: service = self.get_service(self._event.files) if service is None: logger.info("No service found.") - return self._success_callback() - service_path = f"{self._root}/compose/{service}/docker-compose.yml" - self._git.checkout(self._event.commit) - for file in self._event.files: - if file.__contains__('.mo.'): - self._mo.process(Path(f"{self._root}{file}").absolute()) - self._docker.reload(Path(service_path).absolute()) - - return self._success_callback() + result.success = True + else: + service_path = f"{self._root}/compose/{service}/docker-compose.yml" + self._git.checkout(self._event.commit) + for file in self._event.files: + if file.__contains__('.mo.'): + self._mo.process(Path(f"{self._root}{file}").absolute()) + self._docker.reload(Path(service_path).absolute()) + result.success = True except Exception as e: - return self._error_callback(e) + result.throwable = e + + asyncio.run(dispatch(result)) def get_service(self, files: list[str]) -> str | None: supported_files = [] @@ -76,40 +88,34 @@ class Woodpecker: self._pending = deque() self._lock = RLock() bus.on(WoodpeckerEvent, self.on_ci_event) + bus.on(RunnerResult, self._on_runner_completed) logger.info("Woodpecker initialized.") - def on_ci_event(self, event: WoodpeckerEvent): - logger.info(f"Received event: {event.event_id}") + async def on_ci_event(self, event: WoodpeckerEvent): + logger.debug(f"Received WoodpeckerEvent: {event.event_id}") with self._lock: logger.debug("Lock acquired [on-ci-event]") if len(self._pending) > 0 or self._runner is not None: self._pending.append(event) - return - self._start_runner(event) + else: + self._start_runner(event) def _start_runner(self, event: WoodpeckerEvent): with self._lock: logger.debug("Lock acquired [start-runner]") - self._runner = WoodpeckerRunner(self._git, self._docker, self._mo, - self._on_runner_completed, self._on_runner_error) + self._runner = WoodpeckerRunner(self._git, self._docker, self._mo) self._runner.process_event(event) - def _on_runner_completed(self): - logger.info("Runner completed.") - self._runner.join() + def _on_runner_completed(self, result: RunnerResult): + logger.debug(f"Received RunnerResult: {result.event_id}") + logger.info(f"Runner completed {'successfully' if result.success else 'with error'}.") + if result.throwable is not None: + logger.error(f"Runner error: {result.throwable}", exc_info=True) + self._runner.join(timeout=1) + logger.debug("Runner joined.") with self._lock: logger.debug("Lock acquired [on-runner-completed]") self._runner = None if len(self._pending) > 0: event = self._pending.popleft() self._start_runner(event) - - def _on_runner_error(self, t: Exception): - logger.error(f"Runner error: {t}", exc_info=True) - self._runner.join() - with self._lock: - logger.debug("Lock acquired [on-runner-error]") - self._runner = None - if len(self._pending) > 0: - event = self._pending.popleft() - self._start_runner(event)