Merge pull request 'events' (#8) from events into webhook

Reviewed-on: https://hattori.ztsh.eu/paas/karl/pulls/8
This commit is contained in:
Piotr Dec 2025-11-03 22:37:36 +01:00
commit f55863b8d2
16 changed files with 236 additions and 161 deletions

View file

@ -1,12 +1,12 @@
from automapper import mapper from automapper import mapper, exceptions
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
from fastapi_utils.cbv import cbv from fastapi_utils.cbv import cbv
from starlette.responses import JSONResponse, Response from starlette.responses import JSONResponse, Response
from app.api.models import Request from app.api.models import Request
from app.core.core import WebhookProcessor
from app.core.injects import AutowireSupport from app.core.injects import AutowireSupport
from app.model.webhook import WebhookEvent from app.core.woodpecker import Woodpecker
from app.model.webhook import WoodpeckerEvent
router = APIRouter() router = APIRouter()
@ -18,17 +18,21 @@ async def root():
@cbv(router) @cbv(router)
class APIv1: class APIv1:
webhook_service: WebhookProcessor = Depends(AutowireSupport.webhook_processor) woodpecker: Woodpecker = Depends(AutowireSupport.woodpecker)
logger = __import__('logging').getLogger(__name__) logger = __import__('logging').getLogger(__name__)
def __init__(self): def __init__(self):
mapper.add(Request, WebhookEvent) try: # TODO: rejestracja w innym miejscu: klasa jest przeładowywana co żądanie
mapper.add(Request, WoodpeckerEvent)
except exceptions.DuplicatedRegistrationError:
pass
@router.get("/health", summary="Health check") @router.get("/health", summary="Health check")
async def health(self) -> JSONResponse: async def health(self) -> JSONResponse:
return JSONResponse({"status": self.webhook_service.health}) # TODO: JSON serialize
return JSONResponse({"status": "unknown"})
@router.post("/ci", summary="CI Webhook") @router.post("/ci", summary="CI Webhook")
async def ci(self, request: Request): async def ci(self, request: Request):
self.webhook_service.enqueue(mapper.map(request)) self.woodpecker.on_ci_event(mapper.map(request))
return Response(status_code=201) return Response(status_code=201)

View file

@ -1,51 +0,0 @@
import logging
import uuid
from typing import Annotated, List
from injectable import injectable, autowired, Autowired
from app.core.queue import EnqueuedProcessor, ProcessQueue, Task, Result
from app.model.healthcheck import HealthCheck
from app.model.webhook import WebhookEvent
from app.services import DockerService, GitService, Passwords
logger = logging.getLogger(__name__)
@injectable(singleton=True)
class WebhookProcessor(EnqueuedProcessor):
@autowired
def __init__(self, docker: Annotated[DockerService, Autowired],
git: Annotated[GitService, Autowired],
keepass: Annotated[Passwords, Autowired],
queue: Annotated[ProcessQueue, Autowired]):
super().__init__(queue)
self._docker = docker
self._git = git
self._keepass = keepass
def enqueue(self, event: WebhookEvent):
self._enqueue(Task(uuid.UUID(), self, event))
def _process(self, task: Task[WebhookEvent]) -> Result:
event: WebhookEvent = task.payload
# TODO: persist event data
commit_hash = self._git.get_new_commit_hash()
if commit_hash != event.commit:
logger.warning(f"Commit hash mismatch: {commit_hash} != {event.commit}")
return Result(task.id, False, "Commit hash mismatch")
# TODO: persist commit data
service = self._get_service(event.files)
return Result(task.id, True)
def _get_service(self, files: List[str]) -> str:
pass
@property
def health(self) -> HealthCheck:
return HealthCheck(
self._docker is not None and self._git is not None and self._keepass is not None,
f"Docker: {self._docker is not None}, Git: {self._git is not None}, KeePass: {self._keepass is not None}"
)

View file

@ -1,10 +1,10 @@
from injectable import inject from injectable import inject
from app.core.core import WebhookProcessor from app.core.woodpecker import Woodpecker
class AutowireSupport: class AutowireSupport:
@staticmethod @staticmethod
def webhook_processor(): def woodpecker():
return inject(WebhookProcessor) return inject(Woodpecker)

View file

@ -1,52 +0,0 @@
import time
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass
from multiprocessing import Queue, Process
from typing import TypeVar
from injectable import injectable
T = TypeVar('T')
@dataclass
class Task[T]:
id: uuid.UUID
processor: 'EnqueuedProcessor'
payload: T
@dataclass
class Result:
_id: uuid.UUID
success: bool
error: str | None = None
@injectable(singleton=True)
class ProcessQueue:
def __init__(self):
self._q = Queue()
self._process_thread = Process(target=self._run, args=(self._q,))
self._process_thread.start()
def put(self, task: Task):
self._q.put(task)
def _run(self, queue: Queue):
while True:
if queue.empty():
time.sleep(10)
continue
task = queue.get()
task.processor._process(task.payload)
class EnqueuedProcessor(ABC):
def __init__(self, queue: ProcessQueue):
self._queue = queue
def _enqueue(self, task: Task):
self._queue.put(task)
@abstractmethod
def _process(self, task: Task) -> Result:
pass

107
app/core/woodpecker.py Normal file
View file

@ -0,0 +1,107 @@
import logging
from collections import deque
from multiprocessing import Process, Lock
from pathlib import Path
from typing import Annotated
from injectable import injectable, Autowired, autowired
from app.config import get_settings
from app.model.webhook import WoodpeckerEvent
from app.services import GitService, DockerService
from app.services.mo import Mo
logger = logging.getLogger(__name__)
class WoodpeckerRunner(Process):
def __init__(self, git: GitService, docker: DockerService, mo: Mo,
success_callback=None, error_callback=None):
super().__init__(daemon=True)
self._git = git
self._docker = docker
self._mo = mo
self._success_callback = success_callback
self._error_callback = error_callback
self._event: WoodpeckerEvent | None = None
self._root = get_settings().git.path
def process_event(self, event: WoodpeckerEvent):
self._event = event
self.start()
def run(self):
try:
service = self.get_service(self._event.files)
if service is None:
logger.info("No service found.")
return self._success_callback()
service_path = f"{self._root}/compose/{service}/docker-compose.yml"
self._git.checkout(self._event.commit)
for file in self._event.files:
if file.__contains__('.mo.'):
self._mo.process(Path(f"{self._root}{file}").absolute())
self._docker.reload(Path(service_path).absolute())
return self._success_callback()
except Exception as e:
return self._error_callback(e)
def get_service(self, files: list[str]) -> str | None:
supported_files = []
for f in files:
f_parts = f.split("/")
if f_parts[0] in ["compose", "files"]:
supported_files.append(f[1])
match len(set(supported_files)):
case 0:
return None
case 1:
return supported_files[0]
case _:
raise Exception("Multiple services are not supported.")
@injectable(singleton=True)
class Woodpecker:
@autowired
def __init__(self, mo: Annotated[Mo, Autowired]):
self._mo = mo
self._git = GitService()
self._docker = DockerService()
self._runner: WoodpeckerRunner | None = None
self._pending = deque()
self._lock = Lock()
logger.info("Woodpecker initialized.")
def on_ci_event(self, event: WoodpeckerEvent):
logger.info(f"Received event: {event}")
with self._lock:
if len(self._pending) > 0 or self._runner is not None:
self._pending.append(event)
return
self._start_runner(event)
def _start_runner(self, event: WoodpeckerEvent):
with self._lock:
self._runner = WoodpeckerRunner(self._git, self._docker, self._mo,
self._on_runner_completed, self._on_runner_error)
self._runner.process_event(event)
def _on_runner_completed(self):
logger.info("Runner completed.")
self._runner.join()
with self._lock:
self._runner = None
if len(self._pending) > 0:
event = self._pending.popleft()
self._start_runner(event)
def _on_runner_error(self, t: Exception):
logger.error(f"Runner error: {t}", exc_info=True)
self._runner.join()
with self._lock:
self._runner = None
if len(self._pending) > 0:
event = self._pending.popleft()
self._start_runner(event)

View file

@ -11,11 +11,11 @@ class KarlApplication:
from starlette.types import Receive, Scope, Send from starlette.types import Receive, Scope, Send
def __init__(self) -> None: def __init__(self) -> None:
self._set_logging() self._set_logging()
load_injection_container()
_app = FastAPI(title="Karl", version="0.1.0") _app = FastAPI(title="Karl", version="0.1.0")
self._set_middlewares(_app) self._set_middlewares(_app)
self._set_routes(_app) self._set_routes(_app)
self._set_events(_app) self._set_events(_app)
self._init_services()
self._app = _app self._app = _app
@ -58,10 +58,6 @@ class KarlApplication:
def _set_events(self, app: FastAPI): def _set_events(self, app: FastAPI):
pass pass
def _init_services(self):
logger = logging.getLogger(__name__)
load_injection_container()
def run(): def run():
return KarlApplication() return KarlApplication()

View file

@ -1,9 +1,8 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import List from typing import List
@dataclass @dataclass
class WebhookEvent: class WoodpeckerEvent:
_id: str _id: str
commit: str commit: str
message: str message: str

View file

@ -1,4 +1,5 @@
import logging import logging
from pathlib import Path
import docker import docker
from docker.models.containers import Container from docker.models.containers import Container
@ -33,3 +34,8 @@ class DockerService:
@property @property
def tree(self) -> Tree: def tree(self) -> Tree:
return self._tree return self._tree
def reload(self, compose_path: Path):
cmd = ["sudo", "docker", "compose", "-f", str(compose_path), "up", "-d"]
# TODO: subprocess

37
app/services/mo.py Normal file
View file

@ -0,0 +1,37 @@
from pathlib import Path
from string import Template
from typing import Annotated
from injectable import injectable, autowired, Autowired
from app.services import Passwords
class SimpleValueTemplate(Template):
# Pozwala na kropki w nazwach placeholderów, np. ${user.name.first}
idpattern = r'[_a-zA-Z][_a-zA-Z0-9.]*'
class ComplexValueTemplate(SimpleValueTemplate):
delimiter = '@'
@injectable
class Mo:
@autowired
def __init__(self, passwords: Annotated[Passwords, Autowired]):
self._passwords = passwords
def process(self, mo_file: Path):
raw = ''
with open(mo_file, "r") as mo:
raw = mo.read()
cmp = ComplexValueTemplate(raw)
rendered = cmp.substitute(self._passwords.get_values(cmp.get_identifiers()))
smp = SimpleValueTemplate(rendered)
ids = [_id + '.password' for _id in smp.get_identifiers()]
mappings = {k.replace('.password', ''): v for k, v in self._passwords.get_values(ids).items()}
rendered = smp.substitute(mappings)
de_mo_ified = str(mo_file).replace(".mo", "")
with open(de_mo_ified, "w") as mo:
mo.write(rendered)

View file

@ -1,7 +1,8 @@
import os.path import os.path
import shutil
from injectable import injectable from injectable import injectable
from pykeepass import PyKeePass, create_database, Group from pykeepass import PyKeePass, create_database
@injectable(singleton=True) @injectable(singleton=True)
@ -11,30 +12,49 @@ class Passwords:
settings = get_settings() settings = get_settings()
with open(settings.kp.secret, "r") as fh: with open(settings.kp.secret, "r") as fh:
secret = fh.read() secret = fh.read().splitlines()[0]
self._path = settings.kp.file
self._kp_org = self.__get_or_create_store(settings.kp.file, secret) self._kp_org = self._open_or_create(self._path, secret)
self._kp = self.__get_lock(settings.kp.file, secret) self._kp = self._open_lock(self._path, secret)
@staticmethod @staticmethod
def __get_or_create_store(path, passwd) -> PyKeePass: def _open_or_create(path, password) -> PyKeePass:
if os.path.exists(path): if os.path.exists(path):
return PyKeePass( return PyKeePass(path, password=password)
path, return create_database(path, password)
password=passwd,
)
return create_database(path, passwd)
@staticmethod @staticmethod
def __get_lock(path, passwd) -> PyKeePass: def _open_lock(path, password) -> PyKeePass:
lock_path = path + ".lock" lock_path = path + ".lock"
import shutil
shutil.copyfile(path, lock_path) shutil.copyfile(path, lock_path)
return Passwords.__get_or_create_store(lock_path, passwd) return Passwords._open_or_create(lock_path, password)
@property def get_values(self, keys: list[str]) -> dict[str, str]:
def store(self): output = {}
return self._kp.root_group for k in keys:
key_parts = k.split(".")
path = key_parts[:-1] if len(key_parts) > 2 else None
entry_name = key_parts[-2]
field_name = key_parts[-1]
kp_entry = self._kp_org.find_entries(path=path, first=True, title=entry_name)
output[k] = self._get_field_value(kp_entry, field_name)
return output
def save(self, group: Group): @staticmethod
pass def _get_field_value(kp_entry, field_name):
if kp_entry is None:
return None
match field_name:
case "username":
return kp_entry.username
case "password":
return kp_entry.password
case "url":
return kp_entry.url
case _:
return kp_entry.get_custom_property(field_name)
def save(self):
# nadpisz plik źródłowy zmianami z lock
self._kp.save()
shutil.copyfile(self._path + ".lock", self._path)

View file

@ -13,14 +13,6 @@ class GitService:
self._repo.git.checkout(self._settings.git.branch) self._repo.git.checkout(self._settings.git.branch)
self._origin: Remote = self._repo.remotes.origin self._origin: Remote = self._repo.remotes.origin
def get_modified_compose(self) -> str | None:
self._update()
return self._diff()
def get_new_commit_hash(self) -> str:
self._update()
return self._repo.head.commit.hexsha
@staticmethod @staticmethod
def _check_preconditions(config: GitConfig) -> Repo: def _check_preconditions(config: GitConfig) -> Repo:
def clone(): def clone():
@ -34,16 +26,6 @@ class GitService:
return clone() return clone()
return Repo(config.path) return Repo(config.path)
def _update(self): def checkout(self, sha: str):
self._origin.pull() self._origin.fetch()
self._repo.git.checkout(sha)
def _diff(self) -> str | None:
diff = self._repo.head.commit.diff("HEAD~1")
composes = [f for f in diff if f.a_path.endswith("docker-compose.yml")]
match len(composes):
case 0:
return None
case 1:
return composes[0].a_path
case _:
raise Exception("Multiple compose files modified")

View file

@ -4,7 +4,7 @@ logging:
app: app:
host: "127.0.0.1" host: "127.0.0.1"
port: 8081 port: 8081
reload: true reload: false
git: git:
path: "F:/IdeaProjects/paas/karl/.compose_repository" path: "F:/IdeaProjects/paas/karl/.compose_repository"
branch: "main" branch: "main"

View file

@ -16,6 +16,7 @@ dependencies = [
"docker>=7.1.0", "docker>=7.1.0",
"injectable==4.0.1", "injectable==4.0.1",
"py-automapper>=2.2.0", "py-automapper>=2.2.0",
"fastapi-utils>=0.8.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]

0
tests/__init__.py Normal file
View file

View file

@ -0,0 +1,3 @@
value: ${sample}
nested: ${some.nested.value}
custom: @{custom.field}

23
tests/test_mo.py Normal file
View file

@ -0,0 +1,23 @@
import os
from pathlib import Path
from unittest import TestCase
import yaml
from app.services import Passwords
from app.services.mo import Mo
class TestMo(TestCase):
def test_process(self):
target_path = Path('tests/files/test1/test.yaml')
mo = Mo(Passwords())
mo.process(Path('tests/files/test1/test.mo.yaml').absolute())
self.assertTrue(os.path.exists(target_path))
with open(target_path, 'r') as f:
content = f.read()
self.assertFalse(content.__contains__('${'))
parsed = yaml.load(content, Loader=yaml.FullLoader)
self.assertEqual('some_pass', parsed['value'])
self.assertEqual('nested_pass', parsed['nested'])
self.assertEqual('custom_content', parsed['custom'])