from automapper import mapper, exceptions from fastapi import APIRouter, Depends from fastapi_utils.cbv import cbv from starlette.responses import JSONResponse, Response from app.api.models import Request from app.core.injects import AutowireSupport from app.core.woodpecker import Woodpecker from app.model.webhook import WoodpeckerEvent router = APIRouter() @router.get("/", summary="Main API") async def root(): return {"message": "Witaj w API v1"} @cbv(router) class APIv1: woodpecker: Woodpecker = Depends(AutowireSupport.woodpecker) logger = __import__('logging').getLogger(__name__) def __init__(self): try: # TODO: rejestracja w innym miejscu: klasa jest przeładowywana co żądanie mapper.add(Request, WoodpeckerEvent) except exceptions.DuplicatedRegistrationError: pass @router.get("/health", summary="Health check") async def health(self) -> JSONResponse: # TODO: JSON serialize return JSONResponse({"status": "unknown"}) @router.post("/ci", summary="CI Webhook") async def ci(self, request: Request): self.woodpecker.on_ci_event(mapper.map(request)) return Response(status_code=201)