Merge pull request 'Event bus & threads' (#16) from bus into develop

Reviewed-on: https://hattori.ztsh.eu/iac/karl/pulls/16
This commit is contained in:
Piotr Dec 2025-12-12 21:37:40 +01:00
commit a91b0caa2f
7 changed files with 76 additions and 42 deletions

View file

@ -1,6 +1,6 @@
logging:
level: "TRACE"
path: "../../logs/karl.log"
path: "logs/karl.log"
app:
host: "127.0.0.1"
port: 8081

View file

@ -20,6 +20,7 @@ dependencies = [
"fastapi-utils>=0.8.0",
"keyring>=25.6.0",
"keyring-backend>=0.1.0",
"bubus>=1.5.6",
]
[dependency-groups]

View file

@ -1,4 +1,7 @@
import logging
from automapper import mapper, exceptions
from bubus import EventBus
from fastapi import APIRouter, Depends
from fastapi_utils.cbv import cbv
from starlette.responses import JSONResponse, Response
@ -9,7 +12,7 @@ from core.woodpecker import Woodpecker
from model.webhook import WoodpeckerEvent
router = APIRouter()
logger = logging.getLogger(__name__)
@router.get("/", summary="Main API")
async def root():
@ -19,7 +22,7 @@ async def root():
@cbv(router)
class APIv1:
woodpecker: Woodpecker = Depends(AutowireSupport.woodpecker)
logger = __import__('logging').getLogger(__name__)
bus: EventBus = Depends(AutowireSupport.bus)
def __init__(self):
try: # TODO: rejestracja w innym miejscu: klasa jest przeładowywana co żądanie
@ -34,5 +37,5 @@ class APIv1:
@router.post("/ci", summary="CI Webhook")
async def ci(self, request: Request):
self.woodpecker.on_ci_event(mapper.map(request))
await self.bus.dispatch(mapper.map(request))
return Response(status_code=201)

11
src/karl/core/events.py Normal file
View file

@ -0,0 +1,11 @@
import logging
from bubus import EventBus
from injectable import injectable_factory
logger = logging.getLogger(__name__)
@injectable_factory(EventBus, singleton=True)
def event_bus_factory() -> EventBus:
logger.info("Creating event bus...")
return EventBus()

View file

@ -1,3 +1,4 @@
from bubus import EventBus
from injectable import inject
from core.woodpecker import Woodpecker
@ -8,3 +9,7 @@ class AutowireSupport:
@staticmethod
def woodpecker():
return inject(Woodpecker)
@staticmethod
def bus():
return inject(EventBus)

View file

@ -1,9 +1,11 @@
import asyncio
import logging
from collections import deque
from multiprocessing import Process, Lock
from pathlib import Path
from threading import RLock, Thread
from typing import Annotated
from bubus import EventBus, BaseEvent
from injectable import injectable, Autowired, autowired
from config import get_settings
@ -14,15 +16,20 @@ from services.mo import Mo
logger = logging.getLogger(__name__)
class WoodpeckerRunner(Process):
class RunnerResult(BaseEvent):
success: bool = False
throwable: Exception | None = None
class WoodpeckerRunner(Thread):
@autowired
def __init__(self, git: GitService, docker: DockerService, mo: Mo,
success_callback=None, error_callback=None):
bus: Annotated[EventBus, Autowired]):
super().__init__(daemon=True)
self._git = git
self._docker = docker
self._mo = mo
self._success_callback = success_callback
self._error_callback = error_callback
self._bus = bus
self._event: WoodpeckerEvent | None = None
self._root = get_settings().git.path
@ -31,21 +38,27 @@ class WoodpeckerRunner(Process):
self.start()
def run(self):
async def dispatch(r: RunnerResult):
await self._bus.dispatch(r)
result = RunnerResult()
try:
service = self.get_service(self._event.files)
if service is None:
logger.info("No service found.")
return self._success_callback()
service_path = f"{self._root}/compose/{service}/docker-compose.yml"
self._git.checkout(self._event.commit)
for file in self._event.files:
if file.__contains__('.mo.'):
self._mo.process(Path(f"{self._root}{file}").absolute())
self._docker.reload(Path(service_path).absolute())
return self._success_callback()
result.success = True
else:
service_path = f"{self._root}/compose/{service}/docker-compose.yml"
self._git.checkout(self._event.commit)
for file in self._event.files:
if file.__contains__('.mo.'):
self._mo.process(Path(f"{self._root}{file}").absolute())
self._docker.reload(Path(service_path).absolute())
result.success = True
except Exception as e:
return self._error_callback(e)
result.throwable = e
asyncio.run(dispatch(result))
def get_service(self, files: list[str]) -> str | None:
supported_files = []
@ -65,42 +78,43 @@ class WoodpeckerRunner(Process):
@injectable(singleton=True)
class Woodpecker:
@autowired
def __init__(self, mo: Annotated[Mo, Autowired]):
def __init__(self, mo: Annotated[Mo, Autowired],
bus: Annotated[EventBus, Autowired]):
self._mo = mo
self._bus = bus
self._git = GitService()
self._docker = DockerService()
self._runner: WoodpeckerRunner | None = None
self._pending = deque()
self._lock = Lock()
self._lock = RLock()
bus.on(WoodpeckerEvent, self.on_ci_event)
bus.on(RunnerResult, self._on_runner_completed)
logger.info("Woodpecker initialized.")
def on_ci_event(self, event: WoodpeckerEvent):
logger.info(f"Received event: {event}")
async def on_ci_event(self, event: WoodpeckerEvent):
logger.debug(f"Received WoodpeckerEvent: {event.event_id}")
with self._lock:
logger.debug("Lock acquired [on-ci-event]")
if len(self._pending) > 0 or self._runner is not None:
self._pending.append(event)
return
self._start_runner(event)
else:
self._start_runner(event)
def _start_runner(self, event: WoodpeckerEvent):
with self._lock:
self._runner = WoodpeckerRunner(self._git, self._docker, self._mo,
self._on_runner_completed, self._on_runner_error)
logger.debug("Lock acquired [start-runner]")
self._runner = WoodpeckerRunner(self._git, self._docker, self._mo)
self._runner.process_event(event)
def _on_runner_completed(self):
logger.info("Runner completed.")
self._runner.join()
with self._lock:
self._runner = None
if len(self._pending) > 0:
event = self._pending.popleft()
self._start_runner(event)
def _on_runner_error(self, t: Exception):
logger.error(f"Runner error: {t}", exc_info=True)
self._runner.join()
def _on_runner_completed(self, result: RunnerResult):
logger.debug(f"Received RunnerResult: {result.event_id}")
logger.info(f"Runner completed {'successfully' if result.success else 'with error'}.")
if result.throwable is not None:
logger.error(f"Runner error: {result.throwable}", exc_info=True)
self._runner.join(timeout=1)
logger.debug("Runner joined.")
with self._lock:
logger.debug("Lock acquired [on-runner-completed]")
self._runner = None
if len(self._pending) > 0:
event = self._pending.popleft()

View file

@ -1,9 +1,9 @@
from dataclasses import dataclass
from typing import List
from bubus import BaseEvent
@dataclass
class WoodpeckerEvent:
class WoodpeckerEvent(BaseEvent):
_id: str
commit: str
message: str