From 87e8af3f729fa24fa02c5758ae9af7a9406b9698 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Fri, 31 Oct 2025 00:01:45 +0100 Subject: [PATCH] Event bus basics --- app/api/v1.py | 14 +++++++++---- app/core/core.py | 10 +++++----- app/core/injects.py | 5 +++++ app/core/woodpecker.py | 22 +++++++++++++++++++++ app/events/__init__.py | 41 +++++++++++++++++++++++++++++++++++++++ app/main.py | 6 +----- app/model/webhook.py | 4 +++- app/services/passwords.py | 2 +- 8 files changed, 88 insertions(+), 16 deletions(-) create mode 100644 app/core/woodpecker.py create mode 100644 app/events/__init__.py diff --git a/app/api/v1.py b/app/api/v1.py index 3950b3b..4fbe31f 100644 --- a/app/api/v1.py +++ b/app/api/v1.py @@ -1,4 +1,4 @@ -from automapper import mapper +from automapper import mapper, exceptions from fastapi import APIRouter, Depends from fastapi_utils.cbv import cbv from starlette.responses import JSONResponse, Response @@ -6,6 +6,7 @@ from starlette.responses import JSONResponse, Response from app.api.models import Request from app.core.core import WebhookProcessor from app.core.injects import AutowireSupport +from app.events import SimpleEventBus from app.model.webhook import WebhookEvent router = APIRouter() @@ -19,16 +20,21 @@ async def root(): @cbv(router) class APIv1: webhook_service: WebhookProcessor = Depends(AutowireSupport.webhook_processor) + event_bus: SimpleEventBus = Depends(AutowireSupport.event_bus) logger = __import__('logging').getLogger(__name__) def __init__(self): - mapper.add(Request, WebhookEvent) + try: # TODO: rejestracja w innym miejscu: klasa jest przeładowywana co żądanie + mapper.add(Request, WebhookEvent) + except exceptions.DuplicatedRegistrationError: + pass @router.get("/health", summary="Health check") async def health(self) -> JSONResponse: - return JSONResponse({"status": self.webhook_service.health}) + # TODO: JSON serialize + return JSONResponse({"status": self.webhook_service.health.healthy}) @router.post("/ci", summary="CI Webhook") async def ci(self, request: Request): - self.webhook_service.enqueue(mapper.map(request)) + self.event_bus.publish(mapper.map(request)) return Response(status_code=201) diff --git a/app/core/core.py b/app/core/core.py index 90d49d0..3b16ea1 100644 --- a/app/core/core.py +++ b/app/core/core.py @@ -3,6 +3,7 @@ import uuid from typing import Annotated, List from injectable import injectable, autowired, Autowired +from typing_extensions import deprecated from app.core.queue import EnqueuedProcessor, ProcessQueue, Task, Result from app.model.healthcheck import HealthCheck @@ -11,20 +12,19 @@ from app.services import DockerService, GitService, Passwords logger = logging.getLogger(__name__) +@deprecated("Use event bus instead.") @injectable(singleton=True) -class WebhookProcessor(EnqueuedProcessor): +class WebhookProcessor: @autowired def __init__(self, docker: Annotated[DockerService, Autowired], git: Annotated[GitService, Autowired], - keepass: Annotated[Passwords, Autowired], - queue: Annotated[ProcessQueue, Autowired]): - super().__init__(queue) + keepass: Annotated[Passwords, Autowired]): self._docker = docker self._git = git self._keepass = keepass def enqueue(self, event: WebhookEvent): - self._enqueue(Task(uuid.UUID(), self, event)) + pass def _process(self, task: Task[WebhookEvent]) -> Result: event: WebhookEvent = task.payload diff --git a/app/core/injects.py b/app/core/injects.py index 3dfcc66..0d7ff9c 100644 --- a/app/core/injects.py +++ b/app/core/injects.py @@ -1,6 +1,7 @@ from injectable import inject from app.core.core import WebhookProcessor +from app.events import SimpleEventBus class AutowireSupport: @@ -8,3 +9,7 @@ class AutowireSupport: @staticmethod def webhook_processor(): return inject(WebhookProcessor) + + @staticmethod + def event_bus(): + return inject(SimpleEventBus) diff --git a/app/core/woodpecker.py b/app/core/woodpecker.py new file mode 100644 index 0000000..8236bf5 --- /dev/null +++ b/app/core/woodpecker.py @@ -0,0 +1,22 @@ +import logging + +from injectable import injectable + +from app.events import SimpleEventBus +from app.model.webhook import WebhookEvent + +logger = logging.getLogger(__name__) + +@injectable +class Woodpecker: + + @SimpleEventBus.on(WebhookEvent) + def on_event(self, event): # TODO: caller nie działa -> brakuje instancji klasy? + logger.info(f"Received event: {event}") + pass + + +@SimpleEventBus.on(WebhookEvent) +def on_event2(event): # TODO: Tu działa + logger.info(f"F2: Received event: {event}") + pass diff --git a/app/events/__init__.py b/app/events/__init__.py new file mode 100644 index 0000000..fbd92d2 --- /dev/null +++ b/app/events/__init__.py @@ -0,0 +1,41 @@ +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass +from functools import wraps +from typing import Dict, List, Callable + +from injectable import injectable, inject + + +@dataclass +class Event: + pass + + +@injectable(singleton=True) +class SimpleEventBus: + def __init__(self): + self._handlers: Dict[type, List[Callable]] = {} + self._executor = ThreadPoolExecutor() + + def publish(self, event: Event) -> None: + for handler in self._handlers.get(type(event), []): + # Fire-and-forget execution + self._executor.submit(handler, event) + + def subscribe(self, event_type: type, handler: Callable) -> None: + if event_type not in self._handlers: + self._handlers[event_type] = [] + self._handlers[event_type].append(handler) + + @staticmethod + def on(event: type) -> Callable: + def outer(func): + inject(SimpleEventBus).subscribe(event, func) + + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + return outer diff --git a/app/main.py b/app/main.py index 424548e..669d2d4 100644 --- a/app/main.py +++ b/app/main.py @@ -11,11 +11,11 @@ class KarlApplication: from starlette.types import Receive, Scope, Send def __init__(self) -> None: self._set_logging() + load_injection_container() _app = FastAPI(title="Karl", version="0.1.0") self._set_middlewares(_app) self._set_routes(_app) self._set_events(_app) - self._init_services() self._app = _app @@ -58,10 +58,6 @@ class KarlApplication: def _set_events(self, app: FastAPI): pass - def _init_services(self): - logger = logging.getLogger(__name__) - load_injection_container() - def run(): return KarlApplication() diff --git a/app/model/webhook.py b/app/model/webhook.py index da9ad8d..e8c3b0d 100644 --- a/app/model/webhook.py +++ b/app/model/webhook.py @@ -1,9 +1,11 @@ from dataclasses import dataclass from typing import List +from app.events import Event + @dataclass -class WebhookEvent: +class WebhookEvent(Event): _id: str commit: str message: str diff --git a/app/services/passwords.py b/app/services/passwords.py index 18d8519..6ff8406 100644 --- a/app/services/passwords.py +++ b/app/services/passwords.py @@ -11,7 +11,7 @@ class Passwords: settings = get_settings() with open(settings.kp.secret, "r") as fh: - secret = fh.read() + secret = fh.read().splitlines()[0] self._kp_org = self.__get_or_create_store(settings.kp.file, secret) self._kp = self.__get_lock(settings.kp.file, secret)