karl/app/core/core.py
2025-10-22 23:58:15 +02:00

35 lines
1.2 KiB
Python

import uuid
from typing import Annotated
from injectable import injectable, autowired, Autowired
from app.core.queue import EnqueuedProcessor, ProcessQueue, Task, Result
from app.model.healthcheck import HealthCheck
from app.model.webhook import WebhookEvent
from app.services import DockerService, GitService, Passwords
@injectable(singleton=True)
class WebhookProcessor(EnqueuedProcessor):
@autowired
def __init__(self, docker: Annotated[DockerService, Autowired],
git: Annotated[GitService, Autowired],
keepass: Annotated[Passwords, Autowired],
queue: Annotated[ProcessQueue, Autowired]):
super().__init__(queue)
self._docker = docker
self._git = git
self._keepass = keepass
def enqueue(self, event: WebhookEvent):
self._enqueue(Task(uuid.UUID(), self, event))
def _process(self, task: Task) -> Result:
pass
@property
def health(self) -> HealthCheck:
return HealthCheck(
self._docker is not None and self._git is not None and self._keepass is not None,
f"Docker: {self._docker is not None}, Git: {self._git is not None}, KeePass: {self._keepass is not None}"
)