| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| from __future__ import annotations |
| import argparse |
| import csv |
| import logging |
| import sys |
| import uuid |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, Iterable, List, Optional, Tuple |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| log = logging.getLogger("identity_encoding") |
|
|
| |
| |
| try: |
| import chromadb |
| from chromadb.config import Settings |
| except Exception as e: |
| chromadb = None |
| log.error("No se pudo importar chromadb: %s", e) |
|
|
| from libs.vision_tools_salamandra import FaceAnalyzer |
| from collections import Counter |
|
|
| |
| from libs.audio_tools_ana_2 import VoiceEmbedder |
| from libs.vision_tools_salamandra import FaceOfImageEmbedding |
|
|
| |
| try: |
| import numpy as np |
| except Exception: |
| np = None |
|
|
| |
| IMG_EXT = {".jpg", ".jpeg", ".png", ".bmp", ".webp"} |
| AUD_EXT = {".wav", ".mp3", ".flac", ".m4a", ".ogg"} |
|
|
|
|
| def list_files(root: Path, exts: Iterable[str]) -> List[Path]: |
| root = Path(root) |
| if not root.exists(): |
| return [] |
| return [p for p in root.rglob('*') if p.suffix.lower() in exts] |
|
|
|
|
| def ensure_chroma(db_dir: Path): |
| if chromadb is None: |
| raise RuntimeError("chromadb no instalado. pip install chromadb") |
| db_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| client = chromadb.Client(Settings( |
| chroma_db_impl="duckdb+parquet", |
| persist_directory=str(db_dir) |
| )) |
| return client |
|
|
| |
| def build_faces_index(faces_dir: Path, client, collection_name: str = "index_faces", |
| deepface_model: str = 'Facenet512', drop: bool = True) -> int: |
| |
| if collection_name in [c.name for c in client.list_collections()] and drop: |
| client.delete_collection(name=collection_name) |
| col = client.get_or_create_collection(name=collection_name) |
|
|
| be = FaceOfImageEmbedding(deepface_model=deepface_model) |
| count = 0 |
| registered_identities = set() |
|
|
| for ident_dir in sorted(Path(faces_dir).iterdir() if Path(faces_dir).exists() else []): |
| if not ident_dir.is_dir(): |
| continue |
| ident = ident_dir.name |
| for img_path in list_files(ident_dir, IMG_EXT): |
| embeddings = be.encode_image(img_path) |
| if embeddings is None: |
| log.warning("No face embedding in %s", img_path) |
| continue |
|
|
| |
| for e in (embeddings if isinstance(embeddings[0], list) else [embeddings]): |
| uid = str(uuid.uuid4()) |
| col.add(ids=[uid], embeddings=[e], metadatas=[{"identity": ident, "path": str(img_path)}]) |
| count += 1 |
| registered_identities.add(ident) |
|
|
| |
| print("Ha acabado de crear la base de datos.") |
| print(f"Total de embeddings guardados: {count}") |
| print("Identidades registradas:") |
| for name in sorted(registered_identities): |
| print(f" - {name}") |
|
|
| log.info("index_faces => %d embeddings", count) |
| return count |
|
|
| |
|
|
| def aggregate_face_attributes(faces_dir: Path, out_csv: Path) -> int: |
| """ |
| Procesa un directorio de caras por identidad y genera un CSV con edad y género. |
| Usa FaceAnalyzer para extraer atributos. |
| """ |
| |
| from libs.vision_tools_salamandra import FaceAnalyzer |
| analyzer = FaceAnalyzer() |
|
|
| rows: List[Dict[str, Any]] = [] |
|
|
| faces_dir = Path(faces_dir) |
| if not faces_dir.exists() or not faces_dir.is_dir(): |
| log.error("El directorio de caras no existe: %s", faces_dir) |
| return 0 |
|
|
| def most_common(lst, default="unknown"): |
| return Counter(lst).most_common(1)[0][0] if lst else default |
|
|
| |
| for ident_dir in sorted(faces_dir.iterdir()): |
| if not ident_dir.is_dir(): |
| continue |
| ident = ident_dir.name |
| attrs: List[Dict[str, Any]] = [] |
|
|
| log.info("Procesando identidad: %s", ident) |
|
|
| for img_path in sorted(list_files(ident_dir, IMG_EXT)): |
| try: |
| data = analyzer.analyze_image(str(img_path)) |
| if data: |
| attrs.append(data) |
| except Exception as e: |
| log.warning("Error procesando imagen %s: %s", img_path, e) |
|
|
| genders = [a.get("gender", "unknown") for a in attrs] |
| ages = [a.get("age", "unknown") for a in attrs] |
|
|
| |
| context_txt = (faces_dir.parent / "context" / f"{ident}.txt") |
| identity_context = context_txt.read_text(encoding="utf-8").strip() if context_txt.exists() else "" |
|
|
| rows.append({ |
| "identity": ident, |
| "samples": len(attrs), |
| "gender": most_common(genders), |
| "age_bucket": most_common(ages), |
| "identity_context": identity_context, |
| }) |
|
|
| log.info("Procesados %d atributos para %s", len(attrs), ident) |
|
|
| |
| out_csv.parent.mkdir(parents=True, exist_ok=True) |
| with out_csv.open("w", newline='', encoding="utf-8") as f: |
| fieldnames = list(rows[0].keys()) if rows else ["identity", "identity_context"] |
| writer = csv.DictWriter(f, fieldnames=fieldnames) |
| writer.writeheader() |
| writer.writerows(rows) |
|
|
| log.info("CSV generado correctamente: %s", out_csv) |
| return len(rows) |
|
|
| |
| from pydub import AudioSegment |
|
|
| def build_voices_index(voices_dir: Path, client, collection_name: str = "index_voices", drop: bool = True) -> int: |
| if collection_name in [c.name for c in client.list_collections()] and drop: |
| client.delete_collection(name=collection_name) |
| col = client.get_or_create_collection(name=collection_name) |
|
|
| ve = VoiceEmbedder() |
| count = 0 |
|
|
| for ident_dir in sorted(Path(voices_dir).iterdir() if Path(voices_dir).exists() else []): |
| if not ident_dir.is_dir(): |
| continue |
| ident = ident_dir.name |
| for wav_path in list_files(ident_dir, AUD_EXT): |
| |
| try: |
| emb = ve.embed(wav_path) |
| except Exception as e: |
| log.warning("Error leyendo audio %s: %s. Intentando reconvertir...", wav_path, e) |
| |
| try: |
| audio = AudioSegment.from_file(wav_path) |
| fixed_path = wav_path.with_name(wav_path.stem + "_fixed.wav") |
| audio.export(fixed_path, format="wav") |
| log.info("Archivo convertido a WAV compatible: %s", fixed_path) |
| emb = ve.embed(fixed_path) |
| except Exception as e2: |
| log.error("No se pudo generar embedding tras reconversión para %s: %s", wav_path, e2) |
| continue |
| if emb is None: |
| log.warning("No voice embedding en %s", wav_path) |
| continue |
| uid = str(uuid.uuid4()) |
| col.add(ids=[uid], embeddings=[emb], metadatas=[{"identity": ident, "path": str(wav_path)}]) |
| count += 1 |
|
|
| log.info("index_voices => %d embeddings", count) |
| return count |
|
|
| |
| @dataclass |
| class VisionClient: |
| provider: str = "none" |
|
|
| def describe(self, image_path: str, prompt: str) -> str: |
| return (f"Automatic description (placeholder) for {Path(image_path).name}. " |
| f"{prompt}") |
|
|
|
|
| class TextEmbedder: |
| """Text embeddings with Sentence-Transformers if available; fallback to TF-IDF.""" |
| def __init__(self, model_name: str = "all-MiniLM-L6-v2"): |
| self.kind = "tfidf"; self.model = None; self.vectorizer = None |
| try: |
| from sentence_transformers import SentenceTransformer |
| self.model = SentenceTransformer(model_name) |
| self.kind = "sbert" |
| except Exception: |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| self.vectorizer = TfidfVectorizer(max_features=768) |
|
|
| def fit(self, texts: List[str]): |
| if self.vectorizer is not None: |
| self.vectorizer.fit(texts) |
|
|
| def encode(self, texts: List[str]) -> List[List[float]]: |
| if self.model is not None: |
| arr = self.model.encode(texts, convert_to_numpy=True) |
| return arr.astype(float).tolist() |
| X = self.vectorizer.transform(texts) if self.vectorizer is not None else None |
| return (X.toarray().astype(float).tolist() if X is not None else [[0.0]*128 for _ in texts]) |
|
|
|
|
| def build_scenarios_descriptions(scenarios_dir: Path, out_csv: Path, vision: VisionClient, |
| sample_per_scenario: int = 12) -> Tuple[int, List[Dict[str, Any]]]: |
| rows: List[Dict[str, Any]] = [] |
| for scen_dir in sorted(Path(scenarios_dir).iterdir() if Path(scenarios_dir).exists() else []): |
| if not scen_dir.is_dir(): |
| continue |
| scen = scen_dir.name |
| descs: List[str] = [] |
| imgs = list_files(scen_dir, IMG_EXT)[:sample_per_scenario] |
| for img in imgs: |
| d = vision.describe(str(img), prompt="Describe location, time period, lighting, and atmosphere without mentioning people or time of day.") |
| if d: |
| descs.append(d) |
| if not descs: |
| descs = [f"Scenario {scen} (no images)"] |
| rows.append({"scenario": scen, "descriptions": " \n".join(descs)}) |
|
|
| out_csv.parent.mkdir(parents=True, exist_ok=True) |
| with out_csv.open("w", newline='', encoding="utf-8") as f: |
| w = csv.DictWriter(f, fieldnames=["scenario", "descriptions"]) |
| w.writeheader(); w.writerows(rows) |
| log.info("scenarios_descriptions => %s", out_csv) |
| return len(rows), rows |
|
|
|
|
| def build_scenarios_index(client, rows: List[Dict[str, Any]], embedder: TextEmbedder, |
| collection_name: str = "index_scenarios", drop: bool = True) -> int: |
| texts = [r["descriptions"] for r in rows] |
| embedder.fit(texts) |
| embs = embedder.encode(texts) |
|
|
| if collection_name in [c.name for c in client.list_collections()] and drop: |
| client.delete_collection(name=collection_name) |
| col = client.get_or_create_collection(name=collection_name) |
|
|
| for r, e in zip(rows, embs): |
| col.add(ids=[r["scenario"]], embeddings=[e], metadatas=[{"scenario": r["scenario"]}]) |
| log.info("index_scenarios => %d descriptions", len(rows)) |
| return len(rows) |
|
|
| |
|
|
| def main(): |
| ap = argparse.ArgumentParser(description="Veureu — Build identity/scenario indices and CSVs") |
| ap.add_argument('--faces_dir', default='identities/faces', help='Root directory of face images per identity') |
| ap.add_argument('--voices_dir', default='identities/voices', help='Root directory of voice clips per identity') |
| ap.add_argument('--scenarios_dir', default='scenarios', help='Root directory of scenario folders with images') |
| ap.add_argument('--db_dir', default='chroma_db', help='ChromaDB persistence directory') |
| ap.add_argument('--out_dir', default='results', help='Output directory for CSVs') |
| ap.add_argument('--drop_collections', action='store_true', help='Delete collections if they exist before rebuilding') |
| ap.add_argument('--deepface_model', default='Facenet512', help='DeepFace model to use as fallback') |
| ap.add_argument('--scenario_samples', type=int, default=12, help='Number of images per scenario to describe') |
|
|
| args = ap.parse_args() |
|
|
| faces_dir = Path(args.faces_dir) |
| voices_dir = Path(args.voices_dir) |
| print(voices_dir) |
| scenarios_dir = Path(args.scenarios_dir) |
| out_dir = Path(args.out_dir); out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| client = ensure_chroma(Path(args.db_dir)) |
|
|
| |
| build_faces_index(faces_dir, client, collection_name="index_faces", deepface_model=args.deepface_model, drop=args.drop_collections) |
|
|
| |
| |
| |
|
|
| |
| build_voices_index(voices_dir, client, collection_name="index_voices", drop=args.drop_collections) |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| log.info("✅ Identity encoding completed.") |
|
|
|
|
| if __name__ == '__main__' and '--video' not in sys.argv: |
| main() |
|
|