""" queue_manager.py — Thread-pool job queue with status tracking. """ from __future__ import annotations import threading import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable class State(str, Enum): QUEUED = "queued" RUNNING = "running" DONE = "done" FAILED = "failed" @dataclass class JobStatus: job_id: str state: State = State.QUEUED message: str = "" created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time) def update(self, state: State, message: str = "") -> None: self.state = state self.message = message self.updated_at = time.time() class JobQueue: def __init__(self, max_workers: int = 8, max_jobs: int = 100) -> None: self._max_jobs = max_jobs self._executor = ThreadPoolExecutor(max_workers=max_workers) self._jobs: dict[str, JobStatus] = {} self._lock = threading.Lock() def is_full(self) -> bool: with self._lock: active = sum( 1 for s in self._jobs.values() if s.state in (State.QUEUED, State.RUNNING) ) return active >= self._max_jobs def get_status(self, job_id: str) -> JobStatus | None: with self._lock: return self._jobs.get(job_id) def _set_state(self, job_id: str, state: State, message: str = "") -> None: with self._lock: if job_id in self._jobs: self._jobs[job_id].update(state, message) def register(self, job_id: str) -> JobStatus: status = JobStatus(job_id=job_id) with self._lock: self._jobs[job_id] = status return status