From 34ee5f87549a7838404e1c8218d344032d6367ca Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Sat, 1 Nov 2025 23:02:28 +0100 Subject: [PATCH] CI events flow simplified --- app/api/v1.py | 16 ++++++------ app/core/core.py | 51 ------------------------------------- app/core/injects.py | 11 +++----- app/core/queue.py | 52 -------------------------------------- app/core/woodpecker.py | 57 +++++++++++++++++++++++++++++++++++------- app/events/__init__.py | 41 ------------------------------ app/model/webhook.py | 5 +--- 7 files changed, 59 insertions(+), 174 deletions(-) delete mode 100644 app/core/core.py delete mode 100644 app/core/queue.py delete mode 100644 app/events/__init__.py diff --git a/app/api/v1.py b/app/api/v1.py index 4fbe31f..a012834 100644 --- a/app/api/v1.py +++ b/app/api/v1.py @@ -4,10 +4,9 @@ from fastapi_utils.cbv import cbv from starlette.responses import JSONResponse, Response from app.api.models import Request -from app.core.core import WebhookProcessor from app.core.injects import AutowireSupport -from app.events import SimpleEventBus -from app.model.webhook import WebhookEvent +from app.core.woodpecker import Woodpecker +from app.model.webhook import WoodpeckerEvent router = APIRouter() @@ -19,22 +18,21 @@ async def root(): @cbv(router) class APIv1: - webhook_service: WebhookProcessor = Depends(AutowireSupport.webhook_processor) - event_bus: SimpleEventBus = Depends(AutowireSupport.event_bus) + woodpecker: Woodpecker = Depends(AutowireSupport.woodpecker) logger = __import__('logging').getLogger(__name__) def __init__(self): - try: # TODO: rejestracja w innym miejscu: klasa jest przeładowywana co żądanie - mapper.add(Request, WebhookEvent) + try: # TODO: rejestracja w innym miejscu: klasa jest przeładowywana co żądanie + mapper.add(Request, WoodpeckerEvent) except exceptions.DuplicatedRegistrationError: pass @router.get("/health", summary="Health check") async def health(self) -> JSONResponse: # TODO: JSON serialize - return JSONResponse({"status": self.webhook_service.health.healthy}) + return JSONResponse({"status": "unknown"}) @router.post("/ci", summary="CI Webhook") async def ci(self, request: Request): - self.event_bus.publish(mapper.map(request)) + self.woodpecker.on_ci_event(mapper.map(request)) return Response(status_code=201) diff --git a/app/core/core.py b/app/core/core.py deleted file mode 100644 index 3b16ea1..0000000 --- a/app/core/core.py +++ /dev/null @@ -1,51 +0,0 @@ -import logging -import uuid -from typing import Annotated, List - -from injectable import injectable, autowired, Autowired -from typing_extensions import deprecated - -from app.core.queue import EnqueuedProcessor, ProcessQueue, Task, Result -from app.model.healthcheck import HealthCheck -from app.model.webhook import WebhookEvent -from app.services import DockerService, GitService, Passwords - -logger = logging.getLogger(__name__) - -@deprecated("Use event bus instead.") -@injectable(singleton=True) -class WebhookProcessor: - @autowired - def __init__(self, docker: Annotated[DockerService, Autowired], - git: Annotated[GitService, Autowired], - keepass: Annotated[Passwords, Autowired]): - self._docker = docker - self._git = git - self._keepass = keepass - - def enqueue(self, event: WebhookEvent): - pass - - def _process(self, task: Task[WebhookEvent]) -> Result: - event: WebhookEvent = task.payload - # TODO: persist event data - commit_hash = self._git.get_new_commit_hash() - if commit_hash != event.commit: - logger.warning(f"Commit hash mismatch: {commit_hash} != {event.commit}") - return Result(task.id, False, "Commit hash mismatch") - # TODO: persist commit data - service = self._get_service(event.files) - - - - return Result(task.id, True) - - def _get_service(self, files: List[str]) -> str: - pass - - @property - def health(self) -> HealthCheck: - return HealthCheck( - self._docker is not None and self._git is not None and self._keepass is not None, - f"Docker: {self._docker is not None}, Git: {self._git is not None}, KeePass: {self._keepass is not None}" - ) diff --git a/app/core/injects.py b/app/core/injects.py index 0d7ff9c..115ae95 100644 --- a/app/core/injects.py +++ b/app/core/injects.py @@ -1,15 +1,10 @@ from injectable import inject -from app.core.core import WebhookProcessor -from app.events import SimpleEventBus +from app.core.woodpecker import Woodpecker class AutowireSupport: @staticmethod - def webhook_processor(): - return inject(WebhookProcessor) - - @staticmethod - def event_bus(): - return inject(SimpleEventBus) + def woodpecker(): + return inject(Woodpecker) diff --git a/app/core/queue.py b/app/core/queue.py deleted file mode 100644 index 3e92cd6..0000000 --- a/app/core/queue.py +++ /dev/null @@ -1,52 +0,0 @@ -import time -import uuid -from abc import ABC, abstractmethod -from dataclasses import dataclass -from multiprocessing import Queue, Process -from typing import TypeVar - -from injectable import injectable - -T = TypeVar('T') - -@dataclass -class Task[T]: - id: uuid.UUID - processor: 'EnqueuedProcessor' - payload: T - -@dataclass -class Result: - _id: uuid.UUID - success: bool - error: str | None = None - -@injectable(singleton=True) -class ProcessQueue: - def __init__(self): - self._q = Queue() - self._process_thread = Process(target=self._run, args=(self._q,)) - self._process_thread.start() - - def put(self, task: Task): - self._q.put(task) - - def _run(self, queue: Queue): - while True: - if queue.empty(): - time.sleep(10) - continue - task = queue.get() - task.processor._process(task.payload) - - -class EnqueuedProcessor(ABC): - def __init__(self, queue: ProcessQueue): - self._queue = queue - - def _enqueue(self, task: Task): - self._queue.put(task) - - @abstractmethod - def _process(self, task: Task) -> Result: - pass diff --git a/app/core/woodpecker.py b/app/core/woodpecker.py index 9b6ffb4..9ba242d 100644 --- a/app/core/woodpecker.py +++ b/app/core/woodpecker.py @@ -1,25 +1,64 @@ import logging +from collections import deque +from multiprocessing import Process, Lock from typing import Annotated -from injectable import injectable, Autowired, autowired, inject, injectable_factory +from injectable import injectable, Autowired, autowired -from app.events import SimpleEventBus -from app.model.webhook import WebhookEvent +from app.model.webhook import WoodpeckerEvent +from app.services import Passwords, GitService, DockerService logger = logging.getLogger(__name__) +class WoodpeckerRunner(Process): + def __init__(self, event: WoodpeckerEvent): + super().__init__() + self._event = event + + def run(self): + super().run() + """ + event: WebhookEvent = task.payload + # TODO: persist event data + commit_hash = self._git.get_new_commit_hash() + if commit_hash != event.commit: + logger.warning(f"Commit hash mismatch: {commit_hash} != {event.commit}") + return Result(task.id, False, "Commit hash mismatch") + # TODO: persist commit data + service = self._get_service(event.files) + + """ + + @injectable(singleton=True) class Woodpecker: @autowired - def __init__(self, event_bus: Annotated[SimpleEventBus, Autowired]): + def __init__(self, passwords: Annotated[Passwords, Autowired]): + self._passwords = passwords + self._git = GitService() + self._docker = DockerService() + self._runner: WoodpeckerRunner | None = None + self._pending = deque() + self._lock = Lock() logger.info("Woodpecker initialized.") - event_bus.subscribe(WebhookEvent, self.on_ci_event) - def on_ci_event(self, event): + def on_ci_event(self, event: WoodpeckerEvent): logger.info(f"Received event: {event}") + with self._lock: + if len(self._pending) > 0 or self._runner is not None: + self._pending.append(event) + return + self._start_runner(event) + + def _start_runner(self, event: WoodpeckerEvent): pass - -instance = Woodpecker(inject(SimpleEventBus)) -injectable_factory(Woodpecker)(lambda: instance) + def _on_runner_completed(self): + logger.info("Runner completed.") + self._runner.join() + with self._lock: + self._runner = None + if len(self._pending) > 0: + event = self._pending.popleft() + self._start_runner(event) diff --git a/app/events/__init__.py b/app/events/__init__.py deleted file mode 100644 index 5d54a87..0000000 --- a/app/events/__init__.py +++ /dev/null @@ -1,41 +0,0 @@ -from concurrent.futures import ThreadPoolExecutor -from dataclasses import dataclass -from functools import wraps -from typing import Dict, List, Callable - -from injectable import injectable, inject - - -@dataclass -class Event: - pass - - -@injectable(singleton=True) -class SimpleEventBus: - def __init__(self): - self._handlers: Dict[type, List[Callable]] = {} - self._executor = ThreadPoolExecutor() - - def publish(self, event: Event) -> None: - for handler in self._handlers.get(type(event), []): - # Fire-and-forget execution - self._executor.submit(handler, event) - - def subscribe(self, event_type: type, handler: Callable) -> None: - if event_type not in self._handlers: - self._handlers[event_type] = [] - self._handlers[event_type].append(handler) - - @staticmethod - def on(event: type) -> Callable: - def outer(func): - inject(SimpleEventBus).subscribe(event, func) - - @wraps(func) - def wrapper(*args, **kwargs): - return func(*args, **kwargs) - - return wrapper - - return outer diff --git a/app/model/webhook.py b/app/model/webhook.py index e8c3b0d..364e17b 100644 --- a/app/model/webhook.py +++ b/app/model/webhook.py @@ -1,11 +1,8 @@ from dataclasses import dataclass from typing import List -from app.events import Event - - @dataclass -class WebhookEvent(Event): +class WoodpeckerEvent: _id: str commit: str message: str