diff --git a/app/api/v1.py b/app/api/v1.py index eba78a4..3950b3b 100644 --- a/app/api/v1.py +++ b/app/api/v1.py @@ -30,5 +30,5 @@ class APIv1: @router.post("/ci", summary="CI Webhook") async def ci(self, request: Request): - self.webhook_service.process_ci_event(mapper.map(request)) + self.webhook_service.enqueue(mapper.map(request)) return Response(status_code=201) diff --git a/app/core/core.py b/app/core/core.py index a7096d2..adc68fd 100644 --- a/app/core/core.py +++ b/app/core/core.py @@ -1,23 +1,30 @@ +import uuid from typing import Annotated from injectable import injectable, autowired, Autowired +from app.core.queue import EnqueuedProcessor, ProcessQueue, Task, Result from app.model.healthcheck import HealthCheck from app.model.webhook import WebhookEvent from app.services import DockerService, GitService, Passwords -@injectable -class WebhookProcessor: +@injectable(singleton=True) +class WebhookProcessor(EnqueuedProcessor): @autowired def __init__(self, docker: Annotated[DockerService, Autowired], git: Annotated[GitService, Autowired], - keepass: Annotated[Passwords, Autowired]): + keepass: Annotated[Passwords, Autowired], + queue: Annotated[ProcessQueue, Autowired]): + super().__init__(queue) self._docker = docker self._git = git self._keepass = keepass - def process_ci_event(self, event: WebhookEvent): + def enqueue(self, event: WebhookEvent): + self._enqueue(Task(uuid.UUID(), self, event)) + + def _process(self, task: Task) -> Result: pass @property diff --git a/app/core/queue.py b/app/core/queue.py new file mode 100644 index 0000000..4788c3a --- /dev/null +++ b/app/core/queue.py @@ -0,0 +1,50 @@ +import time +import uuid +from abc import ABC, abstractmethod +from dataclasses import dataclass +from multiprocessing import Queue, Process + +from injectable import injectable + + +@dataclass +class Task: + _id: uuid.UUID + processor: 'EnqueuedProcessor' + payload: object + +@dataclass +class Result: + _id: uuid.UUID + success: bool + error: str | None = None + +@injectable(singleton=True) +class ProcessQueue: + def __init__(self): + self._q = Queue() + self._process_thread = Process(target=self._run, args=(self._q,)) + self._process_thread.start() + + def put(self, task: Task): + self._q.put(task) + + def _run(self, queue: Queue): + while True: + if queue.empty(): + time.sleep(10) + continue + task = queue.get() + task.processor._process(task.payload) + + +class EnqueuedProcessor(ABC): + def __init__(self, queue: ProcessQueue): + self._queue = queue + + def _enqueue(self, task: Task): + self._queue.put(task) + + @abstractmethod + def _process(self, task: Task) -> Result: + pass