import time import uuid from abc import ABC, abstractmethod from dataclasses import dataclass from multiprocessing import Queue, Process from typing import TypeVar from injectable import injectable T = TypeVar('T') @dataclass class Task[T]: id: uuid.UUID processor: 'EnqueuedProcessor' payload: T @dataclass class Result: _id: uuid.UUID success: bool error: str | None = None @injectable(singleton=True) class ProcessQueue: def __init__(self): self._q = Queue() self._process_thread = Process(target=self._run, args=(self._q,)) self._process_thread.start() def put(self, task: Task): self._q.put(task) def _run(self, queue: Queue): while True: if queue.empty(): time.sleep(10) continue task = queue.get() task.processor._process(task.payload) class EnqueuedProcessor(ABC): def __init__(self, queue: ProcessQueue): self._queue = queue def _enqueue(self, task: Task): self._queue.put(task) @abstractmethod def _process(self, task: Task) -> Result: pass