Event bus basics

This commit is contained in:
Piotr Dec 2025-10-31 00:01:45 +01:00
parent 1440ec51b7
commit 87e8af3f72
Signed by: stawros
GPG key ID: 74B18A3F0F1E99C0
8 changed files with 88 additions and 16 deletions

View file

@ -1,4 +1,4 @@
from automapper import mapper from automapper import mapper, exceptions
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
from fastapi_utils.cbv import cbv from fastapi_utils.cbv import cbv
from starlette.responses import JSONResponse, Response from starlette.responses import JSONResponse, Response
@ -6,6 +6,7 @@ from starlette.responses import JSONResponse, Response
from app.api.models import Request from app.api.models import Request
from app.core.core import WebhookProcessor from app.core.core import WebhookProcessor
from app.core.injects import AutowireSupport from app.core.injects import AutowireSupport
from app.events import SimpleEventBus
from app.model.webhook import WebhookEvent from app.model.webhook import WebhookEvent
router = APIRouter() router = APIRouter()
@ -19,16 +20,21 @@ async def root():
@cbv(router) @cbv(router)
class APIv1: class APIv1:
webhook_service: WebhookProcessor = Depends(AutowireSupport.webhook_processor) webhook_service: WebhookProcessor = Depends(AutowireSupport.webhook_processor)
event_bus: SimpleEventBus = Depends(AutowireSupport.event_bus)
logger = __import__('logging').getLogger(__name__) logger = __import__('logging').getLogger(__name__)
def __init__(self): def __init__(self):
try: # TODO: rejestracja w innym miejscu: klasa jest przeładowywana co żądanie
mapper.add(Request, WebhookEvent) mapper.add(Request, WebhookEvent)
except exceptions.DuplicatedRegistrationError:
pass
@router.get("/health", summary="Health check") @router.get("/health", summary="Health check")
async def health(self) -> JSONResponse: async def health(self) -> JSONResponse:
return JSONResponse({"status": self.webhook_service.health}) # TODO: JSON serialize
return JSONResponse({"status": self.webhook_service.health.healthy})
@router.post("/ci", summary="CI Webhook") @router.post("/ci", summary="CI Webhook")
async def ci(self, request: Request): async def ci(self, request: Request):
self.webhook_service.enqueue(mapper.map(request)) self.event_bus.publish(mapper.map(request))
return Response(status_code=201) return Response(status_code=201)

View file

@ -3,6 +3,7 @@ import uuid
from typing import Annotated, List from typing import Annotated, List
from injectable import injectable, autowired, Autowired from injectable import injectable, autowired, Autowired
from typing_extensions import deprecated
from app.core.queue import EnqueuedProcessor, ProcessQueue, Task, Result from app.core.queue import EnqueuedProcessor, ProcessQueue, Task, Result
from app.model.healthcheck import HealthCheck from app.model.healthcheck import HealthCheck
@ -11,20 +12,19 @@ from app.services import DockerService, GitService, Passwords
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@deprecated("Use event bus instead.")
@injectable(singleton=True) @injectable(singleton=True)
class WebhookProcessor(EnqueuedProcessor): class WebhookProcessor:
@autowired @autowired
def __init__(self, docker: Annotated[DockerService, Autowired], def __init__(self, docker: Annotated[DockerService, Autowired],
git: Annotated[GitService, Autowired], git: Annotated[GitService, Autowired],
keepass: Annotated[Passwords, Autowired], keepass: Annotated[Passwords, Autowired]):
queue: Annotated[ProcessQueue, Autowired]):
super().__init__(queue)
self._docker = docker self._docker = docker
self._git = git self._git = git
self._keepass = keepass self._keepass = keepass
def enqueue(self, event: WebhookEvent): def enqueue(self, event: WebhookEvent):
self._enqueue(Task(uuid.UUID(), self, event)) pass
def _process(self, task: Task[WebhookEvent]) -> Result: def _process(self, task: Task[WebhookEvent]) -> Result:
event: WebhookEvent = task.payload event: WebhookEvent = task.payload

View file

@ -1,6 +1,7 @@
from injectable import inject from injectable import inject
from app.core.core import WebhookProcessor from app.core.core import WebhookProcessor
from app.events import SimpleEventBus
class AutowireSupport: class AutowireSupport:
@ -8,3 +9,7 @@ class AutowireSupport:
@staticmethod @staticmethod
def webhook_processor(): def webhook_processor():
return inject(WebhookProcessor) return inject(WebhookProcessor)
@staticmethod
def event_bus():
return inject(SimpleEventBus)

22
app/core/woodpecker.py Normal file
View file

@ -0,0 +1,22 @@
import logging
from injectable import injectable
from app.events import SimpleEventBus
from app.model.webhook import WebhookEvent
logger = logging.getLogger(__name__)
@injectable
class Woodpecker:
@SimpleEventBus.on(WebhookEvent)
def on_event(self, event): # TODO: caller nie działa -> brakuje instancji klasy?
logger.info(f"Received event: {event}")
pass
@SimpleEventBus.on(WebhookEvent)
def on_event2(event): # TODO: Tu działa
logger.info(f"F2: Received event: {event}")
pass

41
app/events/__init__.py Normal file
View file

@ -0,0 +1,41 @@
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from functools import wraps
from typing import Dict, List, Callable
from injectable import injectable, inject
@dataclass
class Event:
pass
@injectable(singleton=True)
class SimpleEventBus:
def __init__(self):
self._handlers: Dict[type, List[Callable]] = {}
self._executor = ThreadPoolExecutor()
def publish(self, event: Event) -> None:
for handler in self._handlers.get(type(event), []):
# Fire-and-forget execution
self._executor.submit(handler, event)
def subscribe(self, event_type: type, handler: Callable) -> None:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
@staticmethod
def on(event: type) -> Callable:
def outer(func):
inject(SimpleEventBus).subscribe(event, func)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
return outer

View file

@ -11,11 +11,11 @@ class KarlApplication:
from starlette.types import Receive, Scope, Send from starlette.types import Receive, Scope, Send
def __init__(self) -> None: def __init__(self) -> None:
self._set_logging() self._set_logging()
load_injection_container()
_app = FastAPI(title="Karl", version="0.1.0") _app = FastAPI(title="Karl", version="0.1.0")
self._set_middlewares(_app) self._set_middlewares(_app)
self._set_routes(_app) self._set_routes(_app)
self._set_events(_app) self._set_events(_app)
self._init_services()
self._app = _app self._app = _app
@ -58,10 +58,6 @@ class KarlApplication:
def _set_events(self, app: FastAPI): def _set_events(self, app: FastAPI):
pass pass
def _init_services(self):
logger = logging.getLogger(__name__)
load_injection_container()
def run(): def run():
return KarlApplication() return KarlApplication()

View file

@ -1,9 +1,11 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import List from typing import List
from app.events import Event
@dataclass @dataclass
class WebhookEvent: class WebhookEvent(Event):
_id: str _id: str
commit: str commit: str
message: str message: str

View file

@ -11,7 +11,7 @@ class Passwords:
settings = get_settings() settings = get_settings()
with open(settings.kp.secret, "r") as fh: with open(settings.kp.secret, "r") as fh:
secret = fh.read() secret = fh.read().splitlines()[0]
self._kp_org = self.__get_or_create_store(settings.kp.file, secret) self._kp_org = self.__get_or_create_store(settings.kp.file, secret)
self._kp = self.__get_lock(settings.kp.file, secret) self._kp = self.__get_lock(settings.kp.file, secret)