feat: new event bus

This commit is contained in:
Piotr Dec 2025-12-11 22:55:17 +01:00
parent e1a03e555e
commit 7808a66342
Signed by: stawros
GPG key ID: 74B18A3F0F1E99C0
6 changed files with 32 additions and 8 deletions

View file

@ -20,6 +20,7 @@ dependencies = [
"fastapi-utils>=0.8.0", "fastapi-utils>=0.8.0",
"keyring>=25.6.0", "keyring>=25.6.0",
"keyring-backend>=0.1.0", "keyring-backend>=0.1.0",
"bubus>=1.5.6",
] ]
[dependency-groups] [dependency-groups]

View file

@ -1,4 +1,7 @@
import logging
from automapper import mapper, exceptions from automapper import mapper, exceptions
from bubus import EventBus
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
from fastapi_utils.cbv import cbv from fastapi_utils.cbv import cbv
from starlette.responses import JSONResponse, Response from starlette.responses import JSONResponse, Response
@ -9,7 +12,7 @@ from core.woodpecker import Woodpecker
from model.webhook import WoodpeckerEvent from model.webhook import WoodpeckerEvent
router = APIRouter() router = APIRouter()
logger = logging.getLogger(__name__)
@router.get("/", summary="Main API") @router.get("/", summary="Main API")
async def root(): async def root():
@ -19,7 +22,7 @@ async def root():
@cbv(router) @cbv(router)
class APIv1: class APIv1:
woodpecker: Woodpecker = Depends(AutowireSupport.woodpecker) woodpecker: Woodpecker = Depends(AutowireSupport.woodpecker)
logger = __import__('logging').getLogger(__name__) bus: EventBus = Depends(AutowireSupport.bus)
def __init__(self): def __init__(self):
try: # TODO: rejestracja w innym miejscu: klasa jest przeładowywana co żądanie try: # TODO: rejestracja w innym miejscu: klasa jest przeładowywana co żądanie
@ -34,5 +37,5 @@ class APIv1:
@router.post("/ci", summary="CI Webhook") @router.post("/ci", summary="CI Webhook")
async def ci(self, request: Request): async def ci(self, request: Request):
self.woodpecker.on_ci_event(mapper.map(request)) await self.bus.dispatch(mapper.map(request))
return Response(status_code=201) return Response(status_code=201)

11
src/karl/core/events.py Normal file
View file

@ -0,0 +1,11 @@
import logging
from bubus import EventBus
from injectable import injectable_factory
logger = logging.getLogger(__name__)
@injectable_factory(EventBus, singleton=True)
def event_bus_factory() -> EventBus:
logger.info("Creating event bus...")
return EventBus()

View file

@ -1,3 +1,4 @@
from bubus import EventBus
from injectable import inject from injectable import inject
from core.woodpecker import Woodpecker from core.woodpecker import Woodpecker
@ -8,3 +9,7 @@ class AutowireSupport:
@staticmethod @staticmethod
def woodpecker(): def woodpecker():
return inject(Woodpecker) return inject(Woodpecker)
@staticmethod
def bus():
return inject(EventBus)

View file

@ -4,6 +4,7 @@ from pathlib import Path
from threading import RLock, Thread from threading import RLock, Thread
from typing import Annotated from typing import Annotated
from bubus import EventBus
from injectable import injectable, Autowired, autowired from injectable import injectable, Autowired, autowired
from config import get_settings from config import get_settings
@ -65,17 +66,20 @@ class WoodpeckerRunner(Thread):
@injectable(singleton=True) @injectable(singleton=True)
class Woodpecker: class Woodpecker:
@autowired @autowired
def __init__(self, mo: Annotated[Mo, Autowired]): def __init__(self, mo: Annotated[Mo, Autowired],
bus: Annotated[EventBus, Autowired]):
self._mo = mo self._mo = mo
self._bus = bus
self._git = GitService() self._git = GitService()
self._docker = DockerService() self._docker = DockerService()
self._runner: WoodpeckerRunner | None = None self._runner: WoodpeckerRunner | None = None
self._pending = deque() self._pending = deque()
self._lock = RLock() self._lock = RLock()
bus.on(WoodpeckerEvent, self.on_ci_event)
logger.info("Woodpecker initialized.") logger.info("Woodpecker initialized.")
def on_ci_event(self, event: WoodpeckerEvent): def on_ci_event(self, event: WoodpeckerEvent):
logger.info(f"Received event: {event}") logger.info(f"Received event: {event.event_id}")
with self._lock: with self._lock:
logger.debug("Lock acquired [on-ci-event]") logger.debug("Lock acquired [on-ci-event]")
if len(self._pending) > 0 or self._runner is not None: if len(self._pending) > 0 or self._runner is not None:

View file

@ -1,9 +1,9 @@
from dataclasses import dataclass
from typing import List from typing import List
from bubus import BaseEvent
@dataclass
class WoodpeckerEvent: class WoodpeckerEvent(BaseEvent):
_id: str _id: str
commit: str commit: str
message: str message: str