Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import time | |
| import traceback | |
| from pathlib import Path | |
| from typing import Dict, Any, List, Tuple | |
| import pandas as pd | |
| import gradio as gr | |
| import papermill as pm | |
| import plotly.graph_objects as go | |
| # Optional LLM (HuggingFace Inference API) | |
| try: | |
| from huggingface_hub import InferenceClient | |
| except Exception: | |
| InferenceClient = None | |
| # ========================================================= | |
| # CONFIG | |
| # ========================================================= | |
| BASE_DIR = Path(__file__).resolve().parent | |
| NB1 = os.environ.get("NB1", "datacreation.ipynb").strip() | |
| NB2 = os.environ.get("NB2", "pythonanalysis.ipynb").strip() | |
| RUNS_DIR = BASE_DIR / "runs" | |
| ART_DIR = BASE_DIR / "artifacts" | |
| PY_FIG_DIR = ART_DIR / "py" / "figures" | |
| PY_TAB_DIR = ART_DIR / "py" / "tables" | |
| PAPERMILL_TIMEOUT = int(os.environ.get("PAPERMILL_TIMEOUT", "1800")) | |
| MAX_PREVIEW_ROWS = int(os.environ.get("MAX_FILE_PREVIEW_ROWS", "50")) | |
| MAX_LOG_CHARS = int(os.environ.get("MAX_LOG_CHARS", "8000")) | |
| HF_API_KEY = os.environ.get("HF_API_KEY", "").strip() | |
| MODEL_NAME = os.environ.get("MODEL_NAME", "deepseek-ai/DeepSeek-R1").strip() | |
| HF_PROVIDER = os.environ.get("HF_PROVIDER", "novita").strip() | |
| N8N_WEBHOOK_URL = os.environ.get("N8N_WEBHOOK_URL", "").strip() | |
| LLM_ENABLED = bool(HF_API_KEY) and InferenceClient is not None | |
| llm_client = ( | |
| InferenceClient(provider=HF_PROVIDER, api_key=HF_API_KEY) | |
| if LLM_ENABLED else None | |
| ) | |
| # ========================================================= | |
| # HELPERS | |
| # ========================================================= | |
| def ensure_dirs(): | |
| for p in [RUNS_DIR, ART_DIR, PY_FIG_DIR, PY_TAB_DIR]: | |
| p.mkdir(parents=True, exist_ok=True) | |
| def stamp(): | |
| return time.strftime("%Y%m%d-%H%M%S") | |
| def tail(text: str, n: int = MAX_LOG_CHARS) -> str: | |
| return (text or "")[-n:] | |
| def _ls(dir_path: Path, exts: Tuple[str, ...]) -> List[str]: | |
| if not dir_path.is_dir(): | |
| return [] | |
| return sorted(p.name for p in dir_path.iterdir() | |
| if p.is_file() and p.suffix.lower() in exts) | |
| def _read_csv(path: Path) -> pd.DataFrame: | |
| return pd.read_csv(path, nrows=MAX_PREVIEW_ROWS) | |
| def _read_json(path: Path): | |
| with path.open(encoding="utf-8") as f: | |
| return json.load(f) | |
| def artifacts_index() -> Dict[str, Any]: | |
| return { | |
| "python": { | |
| "figures": _ls(PY_FIG_DIR, (".png", ".jpg", ".jpeg")), | |
| "tables": _ls(PY_TAB_DIR, (".csv", ".json")), | |
| }, | |
| } | |
| # ========================================================= | |
| # PIPELINE RUNNERS | |
| # ========================================================= | |
| def run_notebook(nb_name: str) -> str: | |
| ensure_dirs() | |
| nb_in = BASE_DIR / nb_name | |
| if not nb_in.exists(): | |
| return f"ERROR: {nb_name} not found." | |
| nb_out = RUNS_DIR / f"run_{stamp()}_{nb_name}" | |
| pm.execute_notebook( | |
| input_path=str(nb_in), | |
| output_path=str(nb_out), | |
| cwd=str(BASE_DIR), | |
| log_output=True, | |
| progress_bar=False, | |
| request_save_on_cell_execute=True, | |
| execution_timeout=PAPERMILL_TIMEOUT, | |
| ) | |
| return f"Executed {nb_name}" | |
| def run_datacreation() -> str: | |
| try: | |
| log = run_notebook(NB1) | |
| csvs = [f.name for f in BASE_DIR.glob("*.csv")] | |
| return f"OK {log}\n\nCSVs now in /app:\n" + "\n".join(f" - {c}" for c in sorted(csvs)) | |
| except Exception as e: | |
| return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}" | |
| def run_pythonanalysis() -> str: | |
| try: | |
| log = run_notebook(NB2) | |
| idx = artifacts_index() | |
| figs = idx["python"]["figures"] | |
| tabs = idx["python"]["tables"] | |
| return ( | |
| f"OK {log}\n\n" | |
| f"Figures: {', '.join(figs) or '(none)'}\n" | |
| f"Tables: {', '.join(tabs) or '(none)'}" | |
| ) | |
| except Exception as e: | |
| return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}" | |
| def run_full_pipeline() -> str: | |
| logs = [] | |
| logs.append("=" * 50) | |
| logs.append("STEP 1/2: Data Creation (real data + synthetic enrichment)") | |
| logs.append("=" * 50) | |
| logs.append(run_datacreation()) | |
| logs.append("") | |
| logs.append("=" * 50) | |
| logs.append("STEP 2/2: Python Analysis (sentiment, dashboard, decisions)") | |
| logs.append("=" * 50) | |
| logs.append(run_pythonanalysis()) | |
| return "\n".join(logs) | |
| # ========================================================= | |
| # GALLERY LOADERS | |
| # ========================================================= | |
| def _load_all_figures() -> List[Tuple[str, str]]: | |
| items = [] | |
| for p in sorted(PY_FIG_DIR.glob("*.png")): | |
| items.append((str(p), p.stem.replace("_", " ").title())) | |
| return items | |
| def _load_table_safe(path: Path) -> pd.DataFrame: | |
| try: | |
| if path.suffix == ".json": | |
| obj = _read_json(path) | |
| if isinstance(obj, dict): | |
| return pd.DataFrame([obj]) | |
| return pd.DataFrame(obj) | |
| return _read_csv(path) | |
| except Exception as e: | |
| return pd.DataFrame([{"error": str(e)}]) | |
| def refresh_gallery(): | |
| figures = _load_all_figures() | |
| idx = artifacts_index() | |
| table_choices = list(idx["python"]["tables"]) | |
| default_df = pd.DataFrame() | |
| if table_choices: | |
| default_df = _load_table_safe(PY_TAB_DIR / table_choices[0]) | |
| return ( | |
| figures if figures else [], | |
| gr.update(choices=table_choices, | |
| value=table_choices[0] if table_choices else None), | |
| default_df, | |
| ) | |
| def on_table_select(choice: str): | |
| if not choice: | |
| return pd.DataFrame([{"hint": "Select a table above."}]) | |
| path = PY_TAB_DIR / choice | |
| if not path.exists(): | |
| return pd.DataFrame([{"error": f"File not found: {choice}"}]) | |
| return _load_table_safe(path) | |
| # ========================================================= | |
| # KPI LOADER | |
| # ========================================================= | |
| def load_kpis() -> Dict[str, Any]: | |
| # Check both the tables folder and the root directory | |
| for candidate in [ | |
| PY_TAB_DIR / "kpis.json", | |
| BASE_DIR / "kpis.json", | |
| ]: | |
| if candidate.exists(): | |
| try: | |
| return _read_json(candidate) | |
| except Exception: | |
| pass | |
| return {} | |
| # ========================================================= | |
| # KPI CARDS | |
| # ========================================================= | |
| def render_kpi_cards() -> str: | |
| kpis = load_kpis() | |
| if not kpis: | |
| return ( | |
| '<div style="background:rgba(255,255,255,.65);backdrop-filter:blur(16px);' | |
| 'border-radius:20px;padding:28px;text-align:center;' | |
| 'border:1.5px solid rgba(255,255,255,.7);' | |
| 'box-shadow:0 8px 32px rgba(124,92,191,.08);">' | |
| '<div style="font-size:36px;margin-bottom:10px;">π</div>' | |
| '<div style="color:#a48de8;font-size:14px;font-weight:800;margin-bottom:6px;">No KPI data yet</div>' | |
| '<div style="color:#9d8fc4;font-size:12px;">Run the pipeline or upload kpis.json to populate these cards.</div>' | |
| '</div>' | |
| ) | |
| def card(icon, label, value, colour): | |
| return ( | |
| f'<div style="background:rgba(255,255,255,.72);backdrop-filter:blur(16px);' | |
| f'border-radius:20px;padding:18px 14px 16px;text-align:center;' | |
| f'border:1.5px solid rgba(255,255,255,.8);' | |
| f'box-shadow:0 4px 16px rgba(124,92,191,.08);border-top:3px solid {colour};">' | |
| f'<div style="font-size:26px;margin-bottom:7px;line-height:1;">{icon}</div>' | |
| f'<div style="color:#9d8fc4;font-size:9.5px;text-transform:uppercase;' | |
| f'letter-spacing:1.8px;margin-bottom:7px;font-weight:800;">{label}</div>' | |
| f'<div style="color:#2d1f4e;font-size:16px;font-weight:800;">{value}</div>' | |
| f'</div>' | |
| ) | |
| # Map our food-review KPI keys to icons/labels/colours | |
| kpi_config = [ | |
| ("total_reviews", "π§Ύ", "Total Reviews", "#a48de8"), | |
| ("real_reviews", "π¦", "Real Reviews", "#7aa6f8"), | |
| ("synthetic_reviews", "π€", "Synthetic", "#6ee7c7"), | |
| ("unique_products", "π", "Unique Products", "#3dcba8"), | |
| ("avg_rating", "β", "Avg Rating", "#e8a230"), | |
| ("pct_positive", "π", "% Positive", "#2ec4a0"), | |
| ("pct_negative", "π", "% Negative", "#e8537a"), | |
| ("avg_sentiment_score", "π", "Avg Sentiment", "#5e8fef"), | |
| ] | |
| html = ( | |
| '<div style="display:grid;grid-template-columns:repeat(auto-fit,minmax(130px,1fr));' | |
| 'gap:12px;margin-bottom:24px;">' | |
| ) | |
| shown = set() | |
| for key, icon, label, colour in kpi_config: | |
| val = kpis.get(key) | |
| if val is None: | |
| continue | |
| shown.add(key) | |
| if isinstance(val, float): | |
| display_val = f"{val:.2f}" | |
| elif isinstance(val, int) and val > 999: | |
| display_val = f"{val:,}" | |
| else: | |
| display_val = str(val) | |
| html += card(icon, label, display_val, colour) | |
| # Any extra keys not in config | |
| for key, val in kpis.items(): | |
| if key not in shown: | |
| label = key.replace("_", " ").title() | |
| display_val = f"{val:,.0f}" if isinstance(val, (int, float)) and val > 100 else str(val) | |
| html += card("π", label, display_val, "#8fa8f8") | |
| html += "</div>" | |
| return html | |
| # ========================================================= | |
| # INTERACTIVE PLOTLY CHARTS β Food Reviews | |
| # ========================================================= | |
| CHART_PALETTE = [ | |
| "#7c5cbf", "#2ec4a0", "#e8537a", "#e8a230", | |
| "#5e8fef", "#c45ea8", "#3dbacc", "#a0522d", | |
| "#6aaa3a", "#d46060", | |
| ] | |
| def _styled_layout(**kwargs) -> dict: | |
| defaults = dict( | |
| template="plotly_white", | |
| paper_bgcolor="rgba(255,255,255,0.95)", | |
| plot_bgcolor="rgba(255,255,255,0.98)", | |
| font=dict(family="system-ui, sans-serif", color="#2d1f4e", size=12), | |
| margin=dict(l=60, r=20, t=70, b=70), | |
| legend=dict( | |
| orientation="h", yanchor="bottom", y=1.02, | |
| xanchor="right", x=1, | |
| bgcolor="rgba(255,255,255,0.92)", | |
| bordercolor="rgba(124,92,191,0.35)", borderwidth=1, | |
| ), | |
| title=dict(font=dict(size=15, color="#4b2d8a")), | |
| ) | |
| defaults.update(kwargs) | |
| return defaults | |
| def _empty_chart(title: str) -> go.Figure: | |
| fig = go.Figure() | |
| fig.update_layout( | |
| title=title, height=420, template="plotly_white", | |
| paper_bgcolor="rgba(255,255,255,0.95)", | |
| annotations=[dict( | |
| text="Run the pipeline to generate data", | |
| x=0.5, y=0.5, xref="paper", yref="paper", | |
| showarrow=False, | |
| font=dict(size=14, color="rgba(124,92,191,0.5)"), | |
| )], | |
| ) | |
| return fig | |
| def build_sales_chart() -> go.Figure: | |
| """Rating & Sentiment overview β reads df_dashboard.csv.""" | |
| # Try both locations: artifacts/py/tables/ and root | |
| for candidate in [PY_TAB_DIR / "df_dashboard.csv", BASE_DIR / "df_dashboard.csv"]: | |
| if candidate.exists(): | |
| path = candidate | |
| break | |
| else: | |
| return _empty_chart("Rating & Sentiment Overview β run the pipeline first") | |
| try: | |
| df = pd.read_csv(path) | |
| except Exception as e: | |
| return _empty_chart(f"Error reading df_dashboard.csv: {e}") | |
| if "sentiment_label" not in df.columns: | |
| return _empty_chart("sentiment_label column not found in df_dashboard.csv") | |
| fig = go.Figure() | |
| # Bar: number of reviews per sentiment | |
| if "n_reviews" in df.columns: | |
| colors = [] | |
| for s in df["sentiment_label"]: | |
| sl = str(s).lower() | |
| if sl == "positive": colors.append("#2ec4a0") | |
| elif sl == "negative": colors.append("#e8537a") | |
| else: colors.append("#5e8fef") | |
| fig.add_trace(go.Bar( | |
| x=df["sentiment_label"], | |
| y=df["n_reviews"], | |
| name="Number of Reviews", | |
| marker_color=colors, | |
| hovertemplate="<b>%{x}</b><br>Reviews: %{y}<extra></extra>", | |
| )) | |
| # Line: avg rating per sentiment on secondary axis | |
| if "avg_rating" in df.columns: | |
| fig.add_trace(go.Scatter( | |
| x=df["sentiment_label"], | |
| y=df["avg_rating"], | |
| name="Avg Rating", | |
| mode="lines+markers", | |
| line=dict(color="#7c5cbf", width=3), | |
| marker=dict(size=10), | |
| yaxis="y2", | |
| hovertemplate="<b>%{x}</b><br>Avg Rating: %{y:.2f}β<extra></extra>", | |
| )) | |
| fig.update_layout( | |
| **_styled_layout( | |
| height=420, | |
| title=dict(text="Reviews & Avg Rating by Sentiment"), | |
| yaxis=dict(title="Number of Reviews"), | |
| yaxis2=dict( | |
| title="Avg Star Rating", | |
| overlaying="y", side="right", | |
| range=[0, 5.5], showgrid=False, | |
| ), | |
| barmode="group", | |
| ) | |
| ) | |
| return fig | |
| def build_sentiment_chart() -> go.Figure: | |
| """Sentiment pie chart β reads df_dashboard.csv.""" | |
| for candidate in [PY_TAB_DIR / "df_dashboard.csv", BASE_DIR / "df_dashboard.csv"]: | |
| if candidate.exists(): | |
| path = candidate | |
| break | |
| else: | |
| return _empty_chart("Sentiment Distribution β run the pipeline first") | |
| try: | |
| df = pd.read_csv(path) | |
| except Exception as e: | |
| return _empty_chart(f"Error reading df_dashboard.csv: {e}") | |
| if "sentiment_label" not in df.columns: | |
| return _empty_chart("sentiment_label column not found in df_dashboard.csv") | |
| color_map = { | |
| "positive": "#2ec4a0", | |
| "neutral": "#5e8fef", | |
| "negative": "#e8537a", | |
| } | |
| colors = [ | |
| color_map.get(str(s).lower(), "#888") | |
| for s in df["sentiment_label"] | |
| ] | |
| metric_col = ( | |
| "n_reviews" if "n_reviews" in df.columns | |
| else df.select_dtypes("number").columns[0] | |
| ) | |
| fig = go.Figure(go.Pie( | |
| labels=df["sentiment_label"], | |
| values=df[metric_col], | |
| marker=dict(colors=colors, line=dict(color="white", width=2)), | |
| textinfo="label+percent", | |
| hovertemplate="<b>%{label}</b><br>Reviews: %{value}<br>Share: %{percent}<extra></extra>", | |
| hole=0.35, | |
| )) | |
| fig.update_layout( | |
| **_styled_layout( | |
| height=420, | |
| title=dict(text="Sentiment Distribution"), | |
| ) | |
| ) | |
| return fig | |
| def build_top_sellers_chart() -> go.Figure: | |
| """Top products bar chart β reads product_performance.csv.""" | |
| for candidate in [PY_TAB_DIR / "product_performance.csv", BASE_DIR / "product_performance.csv"]: | |
| if candidate.exists(): | |
| path = candidate | |
| break | |
| else: | |
| return _empty_chart("Top Products β run the pipeline first") | |
| try: | |
| df = pd.read_csv(path) | |
| except Exception as e: | |
| return _empty_chart(f"Error reading product_performance.csv: {e}") | |
| # Find name column and rating column | |
| name_col = next( | |
| (c for c in df.columns if "name" in c.lower() or "product" in c.lower()), | |
| df.columns[0], | |
| ) | |
| val_col = next( | |
| (c for c in df.columns if "rating" in c.lower()), | |
| df.select_dtypes("number").columns[0] | |
| if len(df.select_dtypes("number").columns) > 0 | |
| else df.columns[1], | |
| ) | |
| df = df.dropna(subset=[name_col, val_col]) | |
| df = df.sort_values(val_col, ascending=True).tail(10) | |
| # Color by positive_ratio if available, else fixed palette | |
| if "positive_ratio" in df.columns: | |
| bar_colors = [ | |
| f"rgba({int(46 + x*150)},{int(196 - x*50)},{int(160 + x*30)},0.85)" | |
| for x in df["positive_ratio"].fillna(0.5) | |
| ] | |
| else: | |
| bar_colors = CHART_PALETTE[: len(df)] | |
| hover = ( | |
| "<b>%{y}</b><br>" | |
| + val_col.replace("_", " ").title() | |
| + ": %{x:.2f}<extra></extra>" | |
| ) | |
| if "n_reviews" in df.columns: | |
| hover = ( | |
| "<b>%{y}</b><br>" | |
| + val_col.replace("_", " ").title() | |
| + ": %{x:.2f}<br>Reviews: " | |
| + df["n_reviews"].astype(str) | |
| + "<extra></extra>" | |
| ) | |
| hover = "<b>%{y}</b><br>Avg Rating: %{x:.2f}<extra></extra>" | |
| fig = go.Figure(go.Bar( | |
| y=df[name_col], | |
| x=df[val_col], | |
| orientation="h", | |
| marker_color=bar_colors, | |
| hovertemplate=hover, | |
| )) | |
| fig.update_layout( | |
| **_styled_layout( | |
| height=max(380, len(df) * 50), | |
| title=dict(text="Products Ranked by Average Rating"), | |
| showlegend=False, | |
| ) | |
| ) | |
| fig.update_xaxes(title="Average Star Rating", range=[0, 5.5]) | |
| fig.update_yaxes(autorange="reversed") | |
| return fig | |
| def refresh_dashboard(): | |
| return ( | |
| render_kpi_cards(), | |
| build_sales_chart(), | |
| build_sentiment_chart(), | |
| build_top_sellers_chart(), | |
| ) | |
| # ========================================================= | |
| # AI DASHBOARD | |
| # ========================================================= | |
| DASHBOARD_SYSTEM = """You are an AI dashboard assistant for a food e-commerce analytics app. | |
| The user asks questions about Amazon food product reviews analysed with sentiment analysis. | |
| AVAILABLE ARTIFACTS (only reference ones that exist): | |
| {artifacts_json} | |
| KPI SUMMARY: {kpis_json} | |
| YOUR JOB: | |
| 1. Answer the user's question conversationally using the KPIs and your knowledge of the artifacts. | |
| 2. At the END of your response, output a JSON block (fenced with ```json ... ```) that tells | |
| the dashboard which artifact to display: | |
| {{"show": "figure"|"table"|"none", "scope": "python", "filename": "...", "chart": "sales"|"sentiment"|"top_sellers"|""}} | |
| RULES: | |
| - sentiment / reviews / positive / negative β chart: "sentiment" | |
| - rating / score / overview / trend β chart: "sales" | |
| - top / best / product / popular / rank β chart: "top_sellers" | |
| - churn / risk / decision / pricing β show table: "business_decisions.csv" | |
| - dashboard / summary / kpi β show table: "df_dashboard.csv" | |
| - pain points / complaints / negative reviews β show table: "top_negative_reviews.csv" | |
| Keep answers concise (2-4 sentences) then the JSON block. | |
| """ | |
| JSON_BLOCK_RE = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL) | |
| FALLBACK_JSON_RE = re.compile(r"\{[^{}]*\"show\"[^{}]*\}", re.DOTALL) | |
| def _parse_display_directive(text: str) -> Dict[str, str]: | |
| m = JSON_BLOCK_RE.search(text) | |
| if m: | |
| try: | |
| return json.loads(m.group(1)) | |
| except json.JSONDecodeError: | |
| pass | |
| m = FALLBACK_JSON_RE.search(text) | |
| if m: | |
| try: | |
| return json.loads(m.group(0)) | |
| except json.JSONDecodeError: | |
| pass | |
| return {"show": "none"} | |
| def _clean_response(text: str) -> str: | |
| return JSON_BLOCK_RE.sub("", text).strip() | |
| def _n8n_call(msg: str) -> Tuple[str, Dict]: | |
| import requests as req | |
| try: | |
| resp = req.post(N8N_WEBHOOK_URL, json={"question": msg}, timeout=20) | |
| data = resp.json() | |
| answer = data.get("answer", "No response from n8n workflow.") | |
| chart = data.get("chart", "none") | |
| if chart and chart != "none": | |
| return answer, {"show": "figure", "chart": chart} | |
| return answer, {"show": "none"} | |
| except Exception as e: | |
| return f"n8n error: {e}. Falling back to keyword matching.", None | |
| def _keyword_fallback(msg: str, idx: Dict, kpis: Dict) -> Tuple[str, Dict]: | |
| """Keyword matcher for food review data.""" | |
| msg_lower = msg.lower() | |
| if not idx["python"]["figures"] and not idx["python"]["tables"]: | |
| return ( | |
| "No artifacts found yet. Please run the pipeline first (Tab 1), " | |
| "then come back here to explore the results.", | |
| {"show": "none"}, | |
| ) | |
| # Build a short KPI summary string | |
| kpi_text = "" | |
| if kpis: | |
| parts = [] | |
| if "total_reviews" in kpis: parts.append(f"**{kpis['total_reviews']:,}** total reviews") | |
| if "unique_products" in kpis: parts.append(f"**{kpis['unique_products']}** unique products") | |
| if "avg_rating" in kpis: parts.append(f"avg rating **{kpis['avg_rating']}β**") | |
| if "pct_positive" in kpis: parts.append(f"**{kpis['pct_positive']}%** positive reviews") | |
| if parts: | |
| kpi_text = "Quick summary: " + ", ".join(parts) + "." | |
| if any(w in msg_lower for w in ["sentiment", "positive", "negative", "distribution", "review"]): | |
| return ( | |
| f"Here is the sentiment distribution across food reviews. {kpi_text}", | |
| {"show": "figure", "chart": "sentiment"}, | |
| ) | |
| if any(w in msg_lower for w in ["top", "best", "product", "popular", "rank", "seller"]): | |
| return ( | |
| f"Here are the top products ranked by average rating. {kpi_text}", | |
| {"show": "figure", "chart": "top_sellers"}, | |
| ) | |
| if any(w in msg_lower for w in ["rating", "score", "star", "overview", "trend", "monthly"]): | |
| return ( | |
| f"Here is the rating and sentiment overview. {kpi_text}", | |
| {"show": "figure", "chart": "sales"}, | |
| ) | |
| if any(w in msg_lower for w in ["churn", "risk", "decision", "pricing", "action"]): | |
| return ( | |
| f"Here are the business decisions per product. {kpi_text}", | |
| {"show": "table", "scope": "python", "filename": "business_decisions.csv"}, | |
| ) | |
| if any(w in msg_lower for w in ["pain", "complaint", "problem", "issue", "worst"]): | |
| return ( | |
| f"Here are the most helpful negative reviews. {kpi_text}", | |
| {"show": "table", "scope": "python", "filename": "top_negative_reviews.csv"}, | |
| ) | |
| if any(w in msg_lower for w in ["dashboard", "summary", "kpi", "overview", "data"]): | |
| return ( | |
| f"Dashboard overview. {kpi_text}\n\n" | |
| "Ask me about: **sentiment distribution**, **product ratings**, " | |
| "**top products**, **churn risk**, or **business decisions**.", | |
| {"show": "table", "scope": "python", "filename": "df_dashboard.csv"}, | |
| ) | |
| # Default | |
| return ( | |
| f"I can help you explore the food review data. {kpi_text}\n\n" | |
| "Try asking about: **sentiment distribution**, **top products**, " | |
| "**product ratings**, **churn risk**, or **business decisions**.", | |
| {"show": "figure", "chart": "sentiment"}, | |
| ) | |
| def ai_chat(user_msg: str, history: list): | |
| if not user_msg or not user_msg.strip(): | |
| return history, "", None, None | |
| idx = artifacts_index() | |
| kpis = load_kpis() | |
| # Priority: n8n webhook β HF LLM β keyword fallback | |
| if N8N_WEBHOOK_URL: | |
| reply, directive = _n8n_call(user_msg) | |
| if directive is None: | |
| reply_fb, directive = _keyword_fallback(user_msg, idx, kpis) | |
| reply += "\n\n" + reply_fb | |
| elif not LLM_ENABLED: | |
| reply, directive = _keyword_fallback(user_msg, idx, kpis) | |
| else: | |
| system = DASHBOARD_SYSTEM.format( | |
| artifacts_json=json.dumps(idx, indent=2), | |
| kpis_json=(json.dumps(kpis, indent=2) | |
| if kpis else "(no KPIs yet β run the pipeline first)"), | |
| ) | |
| msgs = [{"role": "system", "content": system}] | |
| for entry in (history or [])[-6:]: | |
| msgs.append(entry) | |
| msgs.append({"role": "user", "content": user_msg}) | |
| try: | |
| r = llm_client.chat_completion( | |
| model=MODEL_NAME, messages=msgs, | |
| temperature=0.3, max_tokens=600, stream=False, | |
| ) | |
| raw = ( | |
| r["choices"][0]["message"]["content"] | |
| if isinstance(r, dict) | |
| else r.choices[0].message.content | |
| ) | |
| directive = _parse_display_directive(raw) | |
| reply = _clean_response(raw) | |
| except Exception as e: | |
| reply = f"LLM error: {e}. Falling back to keyword matching." | |
| reply_fb, directive = _keyword_fallback(user_msg, idx, kpis) | |
| reply += "\n\n" + reply_fb | |
| # Resolve directive β chart or table | |
| chart_out = None | |
| tab_out = None | |
| show = directive.get("show", "none") | |
| fname = directive.get("filename", "") | |
| chart_name = directive.get("chart", "") | |
| chart_builders = { | |
| "sales": build_sales_chart, | |
| "sentiment": build_sentiment_chart, | |
| "top_sellers": build_top_sellers_chart, | |
| } | |
| if chart_name and chart_name in chart_builders: | |
| chart_out = chart_builders[chart_name]() | |
| elif show == "figure" and fname: | |
| if "sentiment" in fname: | |
| chart_out = build_sentiment_chart() | |
| elif "product" in fname or "seller" in fname or "top" in fname: | |
| chart_out = build_top_sellers_chart() | |
| else: | |
| chart_out = build_sales_chart() | |
| if show == "table" and fname: | |
| # Try tables folder first, then root | |
| for fp in [PY_TAB_DIR / fname, BASE_DIR / fname]: | |
| if fp.exists(): | |
| tab_out = _load_table_safe(fp) | |
| break | |
| if tab_out is None: | |
| reply += f"\n\n*(Could not find table: {fname})*" | |
| new_history = (history or []) + [ | |
| {"role": "user", "content": user_msg}, | |
| {"role": "assistant", "content": reply}, | |
| ] | |
| return new_history, "", chart_out, tab_out | |
| # ========================================================= | |
| # UI | |
| # ========================================================= | |
| ensure_dirs() | |
| def load_css() -> str: | |
| css_path = BASE_DIR / "style.css" | |
| return css_path.read_text(encoding="utf-8") if css_path.exists() else "" | |
| with gr.Blocks(title="AIBDM 2026 Workshop App") as demo: | |
| gr.Markdown( | |
| "# SE21 App Template\n" | |
| "*E-Commerce Food Review Intelligence Dashboard*", | |
| elem_id="escp_title", | |
| ) | |
| # ββ TAB 1 β Pipeline Runner βββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("Pipeline Runner"): | |
| gr.Markdown( | |
| "Run the notebooks to generate data and analysis artifacts. " | |
| "If you have already uploaded the CSV files, you can skip Step 1 " | |
| "and go straight to the Dashboard tab." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| btn_nb1 = gr.Button("Step 1: Data Creation", variant="secondary") | |
| with gr.Column(scale=1): | |
| btn_nb2 = gr.Button("Step 2: Python Analysis", variant="secondary") | |
| with gr.Row(): | |
| btn_all = gr.Button("Run Full Pipeline (Both Steps)", variant="primary") | |
| run_log = gr.Textbox( | |
| label="Execution Log", lines=18, max_lines=30, interactive=False, | |
| ) | |
| btn_nb1.click(run_datacreation, outputs=[run_log]) | |
| btn_nb2.click(run_pythonanalysis, outputs=[run_log]) | |
| btn_all.click(run_full_pipeline, outputs=[run_log]) | |
| # ββ TAB 2 β Dashboard βββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab("Dashboard"): | |
| kpi_html = gr.HTML(value=render_kpi_cards) | |
| refresh_btn = gr.Button("π Refresh Dashboard", variant="primary") | |
| gr.Markdown("#### Interactive Charts") | |
| chart_sales = gr.Plot(label="Rating & Sentiment Overview") | |
| chart_sentiment = gr.Plot(label="Sentiment Distribution") | |
| chart_top = gr.Plot(label="Products by Avg Rating") | |
| gr.Markdown("#### Static Figures (from notebooks)") | |
| gallery = gr.Gallery( | |
| label="Generated Figures", columns=2, height=480, object_fit="contain", | |
| ) | |
| gr.Markdown("#### Data Tables") | |
| table_dropdown = gr.Dropdown( | |
| label="Select a table to view", choices=[], interactive=True, | |
| ) | |
| table_display = gr.Dataframe(label="Table Preview", interactive=False) | |
| def _on_refresh(): | |
| kpi, c1, c2, c3 = refresh_dashboard() | |
| figs, dd, df = refresh_gallery() | |
| return kpi, c1, c2, c3, figs, dd, df | |
| refresh_btn.click( | |
| _on_refresh, | |
| outputs=[kpi_html, chart_sales, chart_sentiment, chart_top, | |
| gallery, table_dropdown, table_display], | |
| ) | |
| table_dropdown.change( | |
| on_table_select, | |
| inputs=[table_dropdown], | |
| outputs=[table_display], | |
| ) | |
| # ββ TAB 3 β AI Dashboard ββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tab('"AI" Dashboard'): | |
| _ai_status = ( | |
| "Connected to your **n8n workflow**." if N8N_WEBHOOK_URL | |
| else "**LLM active.**" if LLM_ENABLED | |
| else "Using **keyword matching**. Set `N8N_WEBHOOK_URL` to connect " | |
| "your n8n workflow, or set `HF_API_KEY` for direct LLM access." | |
| ) | |
| gr.Markdown( | |
| "### Ask questions about your food review data\n\n" | |
| f"Type a question and the system picks the right chart or table. {_ai_status}" | |
| ) | |
| with gr.Row(equal_height=True): | |
| with gr.Column(scale=1): | |
| chatbot = gr.Chatbot(label="Conversation", height=380) | |
| user_input = gr.Textbox( | |
| label="Ask about your data", | |
| placeholder=( | |
| "e.g. Show sentiment distribution / " | |
| "Which products have the best ratings? / " | |
| "What are the main customer complaints?" | |
| ), | |
| lines=1, | |
| ) | |
| gr.Examples( | |
| examples=[ | |
| "Show me the sentiment distribution", | |
| "Which products have the best ratings?", | |
| "What are the top products?", | |
| "Show the business decisions", | |
| "What do negative reviews say?", | |
| "Give me a dashboard overview", | |
| ], | |
| inputs=user_input, | |
| ) | |
| with gr.Column(scale=1): | |
| ai_figure = gr.Plot(label="Interactive Chart") | |
| ai_table = gr.Dataframe(label="Data Table", interactive=False) | |
| user_input.submit( | |
| ai_chat, | |
| inputs=[user_input, chatbot], | |
| outputs=[chatbot, user_input, ai_figure, ai_table], | |
| ) | |
| demo.launch(css=load_css(), allowed_paths=[str(BASE_DIR)]) | |