feat: WoodpeckerRunner events
This commit is contained in:
parent
7808a66342
commit
71de3c76c6
1 changed files with 39 additions and 33 deletions
|
|
@ -1,10 +1,11 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from collections import deque
|
||||
from pathlib import Path
|
||||
from threading import RLock, Thread
|
||||
from typing import Annotated
|
||||
|
||||
from bubus import EventBus
|
||||
from bubus import EventBus, BaseEvent
|
||||
from injectable import injectable, Autowired, autowired
|
||||
|
||||
from config import get_settings
|
||||
|
|
@ -15,15 +16,20 @@ from services.mo import Mo
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RunnerResult(BaseEvent):
|
||||
success: bool = False
|
||||
throwable: Exception | None = None
|
||||
|
||||
|
||||
class WoodpeckerRunner(Thread):
|
||||
@autowired
|
||||
def __init__(self, git: GitService, docker: DockerService, mo: Mo,
|
||||
success_callback=None, error_callback=None):
|
||||
bus: Annotated[EventBus, Autowired]):
|
||||
super().__init__(daemon=True)
|
||||
self._git = git
|
||||
self._docker = docker
|
||||
self._mo = mo
|
||||
self._success_callback = success_callback
|
||||
self._error_callback = error_callback
|
||||
self._bus = bus
|
||||
self._event: WoodpeckerEvent | None = None
|
||||
self._root = get_settings().git.path
|
||||
|
||||
|
|
@ -32,21 +38,27 @@ class WoodpeckerRunner(Thread):
|
|||
self.start()
|
||||
|
||||
def run(self):
|
||||
async def dispatch(r: RunnerResult):
|
||||
await self._bus.dispatch(r)
|
||||
|
||||
result = RunnerResult()
|
||||
try:
|
||||
service = self.get_service(self._event.files)
|
||||
if service is None:
|
||||
logger.info("No service found.")
|
||||
return self._success_callback()
|
||||
service_path = f"{self._root}/compose/{service}/docker-compose.yml"
|
||||
self._git.checkout(self._event.commit)
|
||||
for file in self._event.files:
|
||||
if file.__contains__('.mo.'):
|
||||
self._mo.process(Path(f"{self._root}{file}").absolute())
|
||||
self._docker.reload(Path(service_path).absolute())
|
||||
|
||||
return self._success_callback()
|
||||
result.success = True
|
||||
else:
|
||||
service_path = f"{self._root}/compose/{service}/docker-compose.yml"
|
||||
self._git.checkout(self._event.commit)
|
||||
for file in self._event.files:
|
||||
if file.__contains__('.mo.'):
|
||||
self._mo.process(Path(f"{self._root}{file}").absolute())
|
||||
self._docker.reload(Path(service_path).absolute())
|
||||
result.success = True
|
||||
except Exception as e:
|
||||
return self._error_callback(e)
|
||||
result.throwable = e
|
||||
|
||||
asyncio.run(dispatch(result))
|
||||
|
||||
def get_service(self, files: list[str]) -> str | None:
|
||||
supported_files = []
|
||||
|
|
@ -76,40 +88,34 @@ class Woodpecker:
|
|||
self._pending = deque()
|
||||
self._lock = RLock()
|
||||
bus.on(WoodpeckerEvent, self.on_ci_event)
|
||||
bus.on(RunnerResult, self._on_runner_completed)
|
||||
logger.info("Woodpecker initialized.")
|
||||
|
||||
def on_ci_event(self, event: WoodpeckerEvent):
|
||||
logger.info(f"Received event: {event.event_id}")
|
||||
async def on_ci_event(self, event: WoodpeckerEvent):
|
||||
logger.debug(f"Received WoodpeckerEvent: {event.event_id}")
|
||||
with self._lock:
|
||||
logger.debug("Lock acquired [on-ci-event]")
|
||||
if len(self._pending) > 0 or self._runner is not None:
|
||||
self._pending.append(event)
|
||||
return
|
||||
self._start_runner(event)
|
||||
else:
|
||||
self._start_runner(event)
|
||||
|
||||
def _start_runner(self, event: WoodpeckerEvent):
|
||||
with self._lock:
|
||||
logger.debug("Lock acquired [start-runner]")
|
||||
self._runner = WoodpeckerRunner(self._git, self._docker, self._mo,
|
||||
self._on_runner_completed, self._on_runner_error)
|
||||
self._runner = WoodpeckerRunner(self._git, self._docker, self._mo)
|
||||
self._runner.process_event(event)
|
||||
|
||||
def _on_runner_completed(self):
|
||||
logger.info("Runner completed.")
|
||||
self._runner.join()
|
||||
def _on_runner_completed(self, result: RunnerResult):
|
||||
logger.debug(f"Received RunnerResult: {result.event_id}")
|
||||
logger.info(f"Runner completed {'successfully' if result.success else 'with error'}.")
|
||||
if result.throwable is not None:
|
||||
logger.error(f"Runner error: {result.throwable}", exc_info=True)
|
||||
self._runner.join(timeout=1)
|
||||
logger.debug("Runner joined.")
|
||||
with self._lock:
|
||||
logger.debug("Lock acquired [on-runner-completed]")
|
||||
self._runner = None
|
||||
if len(self._pending) > 0:
|
||||
event = self._pending.popleft()
|
||||
self._start_runner(event)
|
||||
|
||||
def _on_runner_error(self, t: Exception):
|
||||
logger.error(f"Runner error: {t}", exc_info=True)
|
||||
self._runner.join()
|
||||
with self._lock:
|
||||
logger.debug("Lock acquired [on-runner-error]")
|
||||
self._runner = None
|
||||
if len(self._pending) > 0:
|
||||
event = self._pending.popleft()
|
||||
self._start_runner(event)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue