| |
|
|
| import os |
| import io |
| import base64 |
| import zipfile |
| import requests |
| from typing import Iterable, Dict, Any |
|
|
|
|
| class APIClient: |
| """ |
| High-level client for communicating with the Veureu Engine API. |
| |
| Endpoints managed: |
| POST /jobs |
| → {"job_id": "..."} |
| |
| GET /jobs/{job_id}/status |
| → {"status": "queued|processing|done|failed", ...} |
| |
| GET /jobs/{job_id}/result |
| → JobResult such as {"book": {...}, "une": {...}, ...} |
| |
| This class is used by the Streamlit UI to submit videos, poll job status, |
| retrieve results, generate audio, and interact with the TTS and casting services. |
| """ |
|
|
| def __init__( |
| self, |
| base_url: str, |
| use_mock: bool = False, |
| data_dir: str | None = None, |
| token: str | None = None, |
| timeout: int = 180 |
| ): |
| """ |
| Initialize the API client. |
| |
| Args: |
| base_url: Base URL of the engine or TTS service. |
| use_mock: Whether to respond with mock data instead of real API calls. |
| data_dir: Optional data folder for local mock/test files. |
| token: Authentication token (fallback: API_SHARED_TOKEN env var). |
| timeout: Timeout in seconds for requests. |
| """ |
| self.base_url = base_url.rstrip("/") |
| self.tts_url = self.base_url |
| self.use_mock = use_mock |
| self.data_dir = data_dir |
| self.timeout = timeout |
| self.session = requests.Session() |
|
|
| |
| token = token or os.getenv("API_SHARED_TOKEN") |
| if token: |
| self.session.headers.update({"Authorization": f"Bearer {token}"}) |
|
|
|
|
| |
| |
| |
|
|
| def _post_jobs(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: |
| """Submit a video and processing modes to /jobs.""" |
| url = f"{self.base_url}/jobs" |
| files = { |
| "file": (os.path.basename(video_path), open(video_path, "rb"), "application/octet-stream") |
| } |
| data = {"modes": ",".join(modes)} |
|
|
| r = self.session.post(url, files=files, data=data, timeout=self.timeout) |
| r.raise_for_status() |
| return r.json() |
|
|
| def _get_status(self, job_id: str) -> Dict[str, Any]: |
| """Query job status.""" |
| url = f"{self.base_url}/jobs/{job_id}/status" |
| r = self.session.get(url, timeout=self.timeout) |
| r.raise_for_status() |
| return r.json() |
|
|
| def _get_result(self, job_id: str) -> Dict[str, Any]: |
| """Retrieve job result.""" |
| url = f"{self.base_url}/jobs/{job_id}/result" |
| r = self.session.get(url, timeout=self.timeout) |
| r.raise_for_status() |
| return r.json() |
|
|
|
|
| |
| |
| |
|
|
| def process_video(self, video_path: str, modes: Iterable[str]) -> Dict[str, Any]: |
| """Return {"job_id": "..."} either from mock or engine.""" |
| if self.use_mock: |
| return {"job_id": "mock-123"} |
| return self._post_jobs(video_path, modes) |
|
|
| def get_job(self, job_id: str) -> Dict[str, Any]: |
| """ |
| Returns UI-friendly job data: |
| {"status": "done", "results": {"book": {...}, "une": {...}}} |
| |
| Maps engine responses into the expected 'results' format. |
| """ |
| if self.use_mock: |
| return { |
| "status": "done", |
| "results": { |
| "book": {"text": "Example text (book)", "mp3_bytes": b""}, |
| "une": { |
| "srt": "1\n00:00:00,000 --> 00:00:01,000\nExample UNE\n", |
| "mp3_bytes": b"" |
| } |
| } |
| } |
|
|
| status_data = self._get_status(job_id) |
|
|
| |
| if status_data.get("status") in {"queued", "processing"}: |
| return {"status": status_data.get("status", "queued")} |
|
|
| raw_result = self._get_result(job_id) |
| results = {} |
|
|
| |
| if "book" in raw_result: |
| results["book"] = {"text": raw_result["book"].get("text")} |
| if "une" in raw_result: |
| results["une"] = {"srt": raw_result["une"].get("srt")} |
|
|
| |
| for section in ("book", "une"): |
| if section in raw_result: |
| if "characters" in raw_result[section]: |
| results[section]["characters"] = raw_result[section]["characters"] |
| if "metrics" in raw_result[section]: |
| results[section]["metrics"] = raw_result[section]["metrics"] |
|
|
| final_status = "done" if results else status_data.get("status", "unknown") |
| return {"status": final_status, "results": results} |
|
|
|
|
| |
| |
| |
|
|
| def tts_matxa(self, text: str, voice: str = "central/grau") -> dict: |
| """ |
| Call the TTS /tts/text endpoint to synthesize short audio. |
| |
| Returns: |
| {"mp3_bytes": b"..."} on success |
| {"error": "..."} on failure |
| """ |
| if not self.tts_url: |
| raise ValueError("TTS service URL not configured.") |
|
|
| url = f"{self.tts_url.rstrip('/')}/tts/text" |
| data = {"texto": text, "voice": voice, "formato": "mp3"} |
|
|
| try: |
| r = requests.post(url, data=data, timeout=self.timeout) |
| r.raise_for_status() |
| return {"mp3_bytes": r.content} |
| except requests.exceptions.RequestException as e: |
| return {"error": str(e)} |
|
|
| def rebuild_video_with_ad(self, video_path: str, srt_path: str) -> dict: |
| """ |
| Rebuild a video including audio description (AD) |
| by calling /tts/srt. The server returns a ZIP containing an MP4. |
| """ |
| if not self.tts_url: |
| raise ValueError("TTS service URL not configured.") |
|
|
| url = f"{self.tts_url.rstrip('/')}/tts/srt" |
|
|
| try: |
| files = { |
| "video": (os.path.basename(video_path), open(video_path, "rb"), "video/mp4"), |
| "srt": (os.path.basename(srt_path), open(srt_path, "rb"), "application/x-subrip") |
| } |
| data = {"include_final_mp4": 1} |
|
|
| r = requests.post(url, files=files, data=data, timeout=self.timeout * 5) |
| r.raise_for_status() |
|
|
| with zipfile.ZipFile(io.BytesIO(r.content)) as z: |
| for name in z.namelist(): |
| if name.endswith(".mp4"): |
| return {"video_bytes": z.read(name)} |
|
|
| return {"error": "MP4 file not found inside ZIP."} |
|
|
| except zipfile.BadZipFile: |
| return {"error": "Invalid ZIP response from server."} |
| except requests.exceptions.RequestException as e: |
| return {"error": str(e)} |
|
|
|
|
| |
| |
| |
|
|
| def create_initial_casting( |
| self, |
| video_path: str = None, |
| video_bytes: bytes = None, |
| video_name: str = None, |
| epsilon: float = 0.5, |
| min_cluster_size: int = 2 |
| ) -> dict: |
| """ |
| Calls /create_initial_casting to produce the initial actor/face clustering. |
| |
| Args: |
| video_path: Load video from disk. |
| video_bytes: Provide video already in memory. |
| video_name: Name used if video_bytes is provided. |
| epsilon: DBSCAN epsilon for clustering. |
| min_cluster_size: Minimum number of samples for DBSCAN. |
| """ |
| url = f"{self.base_url}/create_initial_casting" |
|
|
| try: |
| |
| if video_bytes: |
| files = {"video": (video_name or "video.mp4", video_bytes, "video/mp4")} |
| elif video_path: |
| with open(video_path, "rb") as f: |
| files = {"video": (os.path.basename(video_path), f.read(), "video/mp4")} |
| else: |
| return {"error": "Either video_path or video_bytes must be provided."} |
|
|
| data = { |
| "epsilon": str(epsilon), |
| "min_cluster_size": str(min_cluster_size) |
| } |
|
|
| r = self.session.post(url, files=files, data=data, timeout=self.timeout * 5) |
| r.raise_for_status() |
|
|
| if r.headers.get("content-type", "").startswith("application/json"): |
| return r.json() |
|
|
| return {"ok": True} |
|
|
| except Exception as e: |
| return {"error": str(e)} |
|
|
|
|
| |
| |
| |
|
|
| def generate_audio_from_text_file(self, text_content: str, voice: str = "central/grau") -> dict: |
| """ |
| Converts a large text into an SRT-like structure, calls /tts/srt, |
| and extracts 'ad_master.mp3' from the resulting ZIP. |
| |
| Useful for audiobook-like generation. |
| """ |
| if not self.tts_url: |
| raise ValueError("TTS service URL not configured.") |
|
|
| |
| srt_content = "" |
| start = 0 |
|
|
| for idx, raw_line in enumerate(text_content.strip().split("\n")): |
| line = raw_line.strip() |
| if not line: |
| continue |
|
|
| end = start + 5 |
|
|
| def fmt(seconds): |
| h = seconds // 3600 |
| m = (seconds % 3600) // 60 |
| s = seconds % 60 |
| return f"{h:02d}:{m:02d}:{s:02d},000" |
|
|
| srt_content += f"{idx+1}\n" |
| srt_content += f"{fmt(start)} --> {fmt(end)}\n" |
| srt_content += f"{line}\n\n" |
| start = end |
|
|
| if not srt_content: |
| return {"error": "Provided text is empty or cannot be processed."} |
|
|
| |
| url = f"{self.tts_url.rstrip('/')}/tts/srt" |
|
|
| try: |
| files = {"srt": ("fake_ad.srt", srt_content, "application/x-subrip")} |
| data = {"voice": voice, "ad_format": "mp3"} |
|
|
| r = requests.post(url, files=files, data=data, timeout=self.timeout * 5) |
| r.raise_for_status() |
|
|
| with zipfile.ZipFile(io.BytesIO(r.content)) as z: |
| if "ad_master.mp3" in z.namelist(): |
| return {"mp3_bytes": z.read("ad_master.mp3")} |
|
|
| return {"error": "'ad_master.mp3' not found inside ZIP."} |
|
|
| except requests.exceptions.RequestException as e: |
| return {"error": f"Error calling SRT API: {e}"} |
| except zipfile.BadZipFile: |
| return {"error": "Invalid ZIP response from server."} |
|
|
| def tts_long_text(self, text: str, voice: str = "central/grau") -> dict: |
| """ |
| Call /tts/text_long for very long text TTS synthesis. |
| Returns raw MP3 bytes. |
| """ |
| if not self.tts_url: |
| raise ValueError("TTS service URL not configured.") |
|
|
| url = f"{self.tts_url.rstrip('/')}/tts/text_long" |
| data = {"texto": text, "voice": voice, "formato": "mp3"} |
|
|
| try: |
| r = requests.post(url, data=data, timeout=self.timeout * 10) |
| r.raise_for_status() |
| return {"mp3_bytes": r.content} |
| except requests.exceptions.RequestException as e: |
| return {"error": str(e)} |
|
|