import os import sys import json import subprocess import argparse import shutil import platform CONFIG_PATH = "setup_config.json" ENVS_FILE = "envs.json" IS_WIN = os.name == 'nt' _PY_PATH = f'"{os.path.join("{dir}", "Scripts", "python.exe")}"' if IS_WIN else f'"{os.path.join("{dir}", "bin", "python")}"' ENV_TEMPLATES = { "uv": { "create": "uv venv --seed --python {ver} \"{dir}\"", "run": _PY_PATH, "install": "uv pip install --index-strategy unsafe-best-match --python \"{dir}\"" }, "venv": { "create": "\"{sys_py}\" -m venv \"{dir}\"", "run": _PY_PATH, "install": f"{_PY_PATH} -m pip install" }, "conda": { "create": "conda create -y -p \"{dir}\" python={ver}", "run": os.path.join("{dir}", "python.exe"), "install": "conda run -p \"{dir}\" pip install" }, "none": { "create": "", "run": "python" if IS_WIN else "python3", "install": "pip install" } } VERSION_CHECK_SCRIPT = """ import sys import importlib import importlib.metadata pkgs = ['torch', 'triton', 'sageattention', 'spas_sage_attn', 'flash_attn'] res = [] try: res.append(f"python={sys.version.split()[0]}") except: res.append("python=Unknown") for p in pkgs: try: ver = importlib.metadata.version(p) res.append(f"{p}={ver}") except importlib.metadata.PackageNotFoundError: try: # Fallback to __version__ m = importlib.import_module(p) ver = getattr(m, '__version__', 'Installed') res.append(f"{p}={ver}") except ImportError: res.append(f"{p}=Missing") except Exception: res.append(f"{p}=Error") print("||".join(res)) """ class EnvsManager: def __init__(self): self.data = {"active": None, "envs": {}} self.load() def load(self): if os.path.exists(ENVS_FILE): try: with open(ENVS_FILE, 'r') as f: self.data = json.load(f) except: print(f"[!] Warning: {ENVS_FILE} corrupted. Starting fresh.") def save(self): with open(ENVS_FILE, 'w') as f: json.dump(self.data, f, indent=4) def get_active(self): return self.data.get("active") def set_active(self, name): if name in self.data["envs"]: self.data["active"] = name self.save() print(f"[*] '{name}' is now the active environment.") else: print(f"[!] Environment '{name}' not found.") def add_env(self, name, type, path): if path: cwd = os.getcwd() abs_path = os.path.abspath(path) try: rel_path = os.path.relpath(abs_path, cwd) if rel_path.startswith("..") or rel_path == ".": final_path = abs_path else: final_path = os.path.join(".", rel_path) except ValueError: final_path = abs_path else: final_path = "" self.data["envs"][name] = {"type": type, "path": final_path} if not self.data["active"]: self.data["active"] = name self.save() def remove_env(self, name): if name in self.data["envs"]: entry = self.data["envs"][name] path = entry["path"] if os.path.exists(path) and entry["type"] != "none": try: print(f"[*] Deleting directory: {path}") if entry["type"] == "conda": run_cmd(f"conda env remove -p \"{path}\" -y") else: shutil.rmtree(path) except Exception as e: print(f"[!] Error removing directory: {e}") del self.data["envs"][name] if self.data["active"] == name: self.data["active"] = None keys = list(self.data["envs"].keys()) if keys: self.data["active"] = keys[0] print(f"[*] Active environment switched to '{keys[0]}'.") else: print("[*] No environments left.") self.save() def list_envs(self): return self.data["envs"] def resolve_target_env(self): """Intelligently determine which env to use for operations.""" envs = self.list_envs() if not envs: print("[!] No environments found. Please run install first.") sys.exit(1) active = self.get_active() if len(envs) == 1: return list(envs.keys())[0] print("\nMultiple environments detected:") keys = list(envs.keys()) for i, k in enumerate(keys): marker = "*" if k == active else " " print(f"{i+1}. [{marker}] {k} ({envs[k]['type']})") print(f"Default: {active}") choice = input("Select environment (Number) or Press Enter for Default: ").strip() if choice == "": return active try: idx = int(choice) - 1 if 0 <= idx < len(keys): return keys[idx] except: pass return active def load_config(): if not os.path.exists(CONFIG_PATH): print(f"Error: {CONFIG_PATH} not found.") sys.exit(1) with open(CONFIG_PATH, 'r') as f: return json.load(f) def get_gpu_info(): if sys.platform == "darwin": try: out = subprocess.check_output( ["system_profiler", "SPDisplaysDataType"], encoding="utf-8", stderr=subprocess.DEVNULL ) for line in out.split("\n"): if "Chip" in line: name = line.split(":", 1)[1].strip() return name, "APPLE" except: pass return "Apple Silicon (MPS)", "APPLE" try: name = subprocess.check_output( ["nvidia-smi", "--query-gpu=name", "--format=csv,noheader"], encoding='utf-8', stderr=subprocess.DEVNULL ).strip() return name, "NVIDIA" except: pass if IS_WIN: try: name = subprocess.check_output( "wmic path win32_VideoController get name", shell=True, encoding='utf-8', stderr=subprocess.DEVNULL ) name = name.replace("Name", "").strip().split('\n')[0].strip() if "Radeon" in name or "AMD" in name: return name, "AMD" return name, "INTEL" except: pass else: try: name = subprocess.check_output( "lspci | grep -i vga", shell=True, encoding='utf-8', stderr=subprocess.DEVNULL ) if "NVIDIA" in name: return name, "NVIDIA" if "AMD" in name or "Advanced Micro Devices" in name: return name, "AMD" except: pass return "Unknown", "UNKNOWN" def get_profile_key(gpu_name, vendor): g = gpu_name.upper() if vendor == "APPLE": return "MPS" if vendor == "NVIDIA": if "50" in g: return "RTX_50" if "40" in g: return "RTX_40" if "30" in g: return "RTX_30" if "20" in g or "QUADRO" in g: return "RTX_20" return "GTX_10" elif vendor == "AMD": if any(x in g for x in ["7600", "7700", "7800", "7900"]): return "AMD_GFX110X" if any(x in g for x in ["7000", "Z1", "PHOENIX"]): return "AMD_GFX1151" if any(x in g for x in ["8000", "STRIX", "1201"]): return "AMD_GFX1201" return "AMD_GFX110X" return "RTX_40" def get_os_key(): if sys.platform == "darwin": return "macos" return "win" if IS_WIN else "linux" def resolve_cmd(cmd_entry): if isinstance(cmd_entry, dict): return cmd_entry.get(get_os_key()) return cmd_entry def run_cmd(cmd, env_vars=None): if not cmd: return if "&&" in cmd and not IS_WIN: print(f"\n>>> Running (Shell): {cmd}") custom_env = os.environ.copy() if env_vars: custom_env.update(env_vars) subprocess.run(cmd, shell=True, check=True, env=custom_env) return print(f"\n>>> Running: {cmd}") custom_env = os.environ.copy() if env_vars: for k, v in env_vars.items(): print(f" [ENV SET] {k}={v}") custom_env[k] = v subprocess.run(cmd, shell=True, check=True, env=custom_env) def run_pip_component(pip, cmd): if not cmd: return run_cmd(cmd.format(pip=pip) if "{pip}" in cmd else f"{pip} {cmd}") def install_plugin_requirements(pip_cmd): plugins_dir = "plugins" if os.path.exists(plugins_dir) and os.path.isdir(plugins_dir): for entry in os.listdir(plugins_dir): plugin_req = os.path.join(plugins_dir, entry, "requirements.txt") if os.path.isfile(plugin_req): print(f"\n[*] Installing requirements for plugin '{entry}'...") run_cmd(f"{pip_cmd} -r \"{plugin_req}\"") def get_env_details(name, env_data): env_type = env_data["type"] dir_name = env_data["path"] entry = ENV_TEMPLATES[env_type] py_exec = entry['run'].format(dir=dir_name).strip('"') full_cmd = [py_exec, "-c", VERSION_CHECK_SCRIPT] try: output = subprocess.check_output(full_cmd, encoding='utf-8', stderr=subprocess.DEVNULL) data = {k: v for k, v in [x.split('=') for x in output.strip().split('||')]} data['path'] = dir_name data['type'] = env_type return data except Exception as e: return {'error': str(e), 'type': env_type, 'path': dir_name} def show_status(): manager = EnvsManager() print("\n" + "="*90) print(f"{'INSTALLED ENVIRONMENTS & VERSIONS':^90}") print("="*90) envs = manager.list_envs() active = manager.get_active() if not envs: print(" No environments installed.") print("="*90) return print(f"{'NAME':<15} | {'TYPE':<5} | {'PYTHON':<8} | {'TORCH':<15} | {'TRITON':<9} | {'SAGE':<10} | {'SPARGE':<10} | {'FLASH':<10}") print("-" * 90) for name, data in envs.items(): details = get_env_details(name, data) marker = "*" if name == active else " " display_name = f"[{marker}] {name}" if 'error' in details: print(f"{display_name:<15} | {data['type']:<5} | [Error reading environment]") continue print(f"{display_name:<15} | {data['type']:<5} | " f"{details.get('python','?'):<8} | " f"{details.get('torch','?'):<15} | " f"{details.get('triton','?'):<9} | " f"{details.get('sageattention','?'):<10} | " f"{details.get('spas_sage_attn','?'):<10} | " f"{details.get('flash_attn','?'):<10}") print("-" * 90) print(f" * = Active Environment") print("="*90 + "\n") def install_logic(env_name, env_type, env_path, py_k, torch_k, triton_k, sage_k, sparge_k, flash_k, kernel_list, config): template = ENV_TEMPLATES[env_type] target_py_ver = config['components']['python'][py_k]['ver'] print(f"\n[1/3] Preparing Environment: {env_name} ({env_type})...") if env_type != "none": if env_type == "venv": py_ver_short = ".".join(target_py_ver.split(".")[:2]) if IS_WIN: create_cmd = f"py -{py_ver_short} -m venv \"{env_path}\"" else: create_cmd = f"python{py_ver_short} -m venv \"{env_path}\"" else: create_cmd = template["create"].format( ver=target_py_ver, dir=env_path, sys_py=sys.executable ) if create_cmd: run_cmd(create_cmd) pip = template["install"].format(dir=env_path) print(f"\n[2/3] Installing Torch: {config['components']['torch'][torch_k]['label']}...") torch_cmd = resolve_cmd(config['components']['torch'][torch_k]['cmd']) run_cmd(f"{pip} {torch_cmd}") print(f"\n[3/3] Installing Requirements & Extras...") run_cmd(f"{pip} -r requirements.txt") if triton_k: cmd = resolve_cmd(config['components']['triton'][triton_k]['cmd']) if cmd: run_cmd(f"{pip} {cmd}") if sage_k: cmd = resolve_cmd(config['components']['sage'][sage_k]['cmd']) if cmd.startswith("http") or cmd.startswith("sageattention"): run_cmd(f"{pip} {cmd}") else: if env_type == "venv" or env_type == "uv": act = f". \"{env_path}/bin/activate\" && " if not IS_WIN else "" run_cmd(f"{act}{cmd}") elif env_type == "conda": pass if sparge_k: cmd = resolve_cmd(config['components']['sparge'][sparge_k]['cmd']) if cmd: run_pip_component(pip, cmd) if flash_k: cmd = resolve_cmd(config['components']['flash'][flash_k]['cmd']) if cmd: run_cmd(f"{pip} {cmd}") for k in kernel_list: if k in config['components']['kernels']: cmd = resolve_cmd(config['components']['kernels'][k]['cmd']) if cmd: run_cmd(f"{pip} {cmd}") install_plugin_requirements(pip) def menu(title, options, recommended_key=None): print(f"\n--- {title} ---") keys = list(options.keys()) for i, k in enumerate(keys): rec = " [RECOMMENDED FOR YOUR GPU]" if k == recommended_key else "" print(f"{i+1}. {options[k]['label']}{rec}") choice = input(f"Select option (Enter for Recommended): ") if choice == "" and recommended_key: return recommended_key try: return keys[int(choice)-1] except: return recommended_key def do_install_interactive(env_type, config, detected_key): manager = EnvsManager() create_wgp_config(detected_key, config) default_name = f"env_{env_type}" if env_type != "none" else "system" print(f"\n--- Configuration for {env_type} ---") name = input(f"Enter a name for this environment (Default: {default_name}): ").strip() if not name: name = default_name cwd = os.getcwd() path = os.path.join(cwd, name) if env_type != "none" else "" if name in manager.list_envs(): print(f"\n[!] Warning: Environment '{name}' already exists in registry.") choice = input("Do you want to overwrite it? (This will delete the old folder) [y/N]: ").lower() if choice != 'y': return manager.remove_env(name) elif os.path.exists(path) and env_type != "none": print(f"\n[!] Warning: Directory '{path}' exists but is not registered.") choice = input("Do you want to overwrite this directory? [y/N]: ").lower() if choice != 'y': return try: shutil.rmtree(path) except: pass print("\n--- Select Install Mode ---") print("1. Autoselect (Based on your GPU)") print("2. Manual Selection") print("3. Use Latest") mode = input("Select option (1-3) [Default: 1]: ").strip() if mode == "2": base = config['gpu_profiles'][detected_key] py_k = menu("Python Version", config['components']['python'], base['python']) torch_k = menu("Torch Version", config['components']['torch'], base['torch']) triton_k = menu("Triton", config['components']['triton'], base['triton']) sage_k = menu("Sage Attention", config['components']['sage'], base['sage']) sparge_k = menu("Sparge Attention", config['components']['sparge'], base.get('sparge')) flash_k = menu("Flash Attention", config['components']['flash'], base['flash']) kernels = base['kernels'] install_logic(name, env_type, path, py_k, torch_k, triton_k, sage_k, sparge_k, flash_k, kernels, config) elif mode == "3": p = config['gpu_profiles']['RTX_50'] install_logic(name, env_type, path, p['python'], p['torch'], p['triton'], p['sage'], p.get('sparge'), p.get('flash'), p['kernels'], config) else: p = config['gpu_profiles'][detected_key] install_logic(name, env_type, path, p['python'], p['torch'], p['triton'], p['sage'], p.get('sparge'), p.get('flash'), p['kernels'], config) manager.add_env(name, env_type, path) if len(manager.list_envs()) > 1: choice = input(f"\nDo you want to make '{name}' the active environment? [Y/n]: ").lower() if choice != 'n': manager.set_active(name) else: print(f"\n[*] '{name}' is the only environment, setting as active.") manager.set_active(name) def do_install_auto(env_type, config, detected_key): manager = EnvsManager() create_wgp_config(detected_key, config) name = f"env_{env_type}" if env_type != "none" else "system" cwd = os.getcwd() path = os.path.join(cwd, name) if env_type != "none" else "" if name in manager.list_envs(): manager.remove_env(name) elif os.path.exists(path) and env_type != "none": try: shutil.rmtree(path) except: pass print(f"\n[*] Starting Automatic Install (Hardware Profile: {detected_key})...") p = config['gpu_profiles'][detected_key] install_logic(name, env_type, path, p['python'], p['torch'], p['triton'], p['sage'], p.get('sparge'), p.get('flash'), p['kernels'], config) manager.add_env(name, env_type, path) manager.set_active(name) print(f"\n[*] Automatic Install Complete! '{name}' is now active.") def open_terminal(): manager = EnvsManager() env_name = manager.get_active() if not env_name: print("[!] No active environment. Please select or install one first.") input("Press Enter...") return env_data = manager.list_envs().get(env_name) if not env_data: print(f"[!] Could not find environment data for '{env_name}'.") return e_type = env_data["type"] e_path = env_data["path"] print(f"\n[*] Spawning interactive terminal for '{env_name}'...") print(f"[*] (Type 'exit' when you are done to return to the menu)\n") if IS_WIN: if e_type in ["venv", "uv"]: act_bat = os.path.join(e_path, 'Scripts', 'activate.bat') subprocess.run(f'cmd.exe /K "{act_bat}"') elif e_type == "conda": conda_bat = "conda.bat" if not shutil.which("conda"): user = os.environ.get("USERPROFILE", "") paths = [ os.path.join(user, "Miniconda3", "condabin", "conda.bat"), os.path.join(user, "Anaconda3", "condabin", "conda.bat"), r"C:\ProgramData\Miniconda3\condabin\conda.bat" ] for p in paths: if os.path.exists(p): conda_bat = p break subprocess.run(f'cmd.exe /K "{conda_bat}" activate "{e_path}"') else: subprocess.run('cmd.exe /K') else: rc_cmd = "if [ -f ~/.bashrc ]; then source ~/.bashrc; fi\n" if e_type in ["venv", "uv"]: rc_cmd += f"source '{os.path.join(e_path, 'bin', 'activate')}'\n" elif e_type == "conda": rc_cmd += ( "if command -v conda >/dev/null 2>&1; then eval \"$(conda shell.bash hook)\"; " "else for base in \"$HOME/miniconda3\" \"$HOME/anaconda3\" \"/opt/miniconda3\" \"/opt/anaconda3\"; do " "if [ -f \"$base/etc/profile.d/conda.sh\" ]; then source \"$base/etc/profile.d/conda.sh\"; break; fi; done; fi\n" f"conda activate '{e_path}'\n" ) linux_shell_cmd = f"bash --rcfile <(cat << 'EOF_WAN2GP'\n{rc_cmd}EOF_WAN2GP\n)" subprocess.run(linux_shell_cmd, shell=True, executable='/bin/bash') def switch_git_branch(): if not os.path.exists(".git"): print("[!] Not a git repository. Cannot switch branches.") input("Press Enter...") return print("\n[*] Fetching latest branches from remote...") try: subprocess.run(["git", "fetch", "--prune"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) out = subprocess.check_output(["git", "branch", "-a"], encoding='utf-8') except Exception as e: print(f"[!] Git error: {e}") input("Press Enter...") return branches = set() active_branch = "" for line in out.splitlines(): line = line.strip() if not line: continue if "->" in line: continue is_active = line.startswith("*") b = line.lstrip("* ").strip() if b.startswith("remotes/origin/"): b = b[len("remotes/origin/"):] elif b.startswith("remotes/"): b = "/".join(b.split("/")[2:]) if b and b != "HEAD": branches.add(b) if is_active: active_branch = b branch_list = sorted(list(branches)) if not branch_list: print("[!] No branches found.") input("Press Enter...") return print("\n--- Available Branches ---") for i, b in enumerate(branch_list): marker = "*" if b == active_branch else " " print(f"{i+1}. [{marker}] {b}") val = input(f"\nEnter number or name of branch (Default: {active_branch}): ").strip() if not val: return target = val if val.isdigit(): idx = int(val) - 1 if 0 <= idx < len(branch_list): target = branch_list[idx] if target == active_branch: print(f"[*] Already on '{target}'.") input("Press Enter...") return print(f"[*] Switching to '{target}'...") try: subprocess.run(["git", "checkout", target], check=True, capture_output=True, text=True) print(f"[*] Successfully switched to '{target}'.") except subprocess.CalledProcessError as e: err_msg = e.stderr.lower() if e.stderr else "" if "overwritten" in err_msg or "please commit your changes or stash them" in err_msg: print(f"\n[!] Failed to switch to '{target}' due to uncommitted changes.") print("How would you like to handle this?") print(" 1. Carry changes over to new branch (Not recommended)") print(" 2. Stash changes") print(" 3. Discard modifications") print(" 4. Cancel") opt = input("\nSelect option (1-4): ").strip() if opt == "1": try: print("\n[*] Stashing changes...") subprocess.run(["git", "stash", "push", "-m", f"Auto-stash before carrying to {target}"], check=True) subprocess.run(["git", "checkout", target], check=True) print(f"[*] Switched to '{target}'. Reapplying changes...") pop_res = subprocess.run(["git", "stash", "pop"]) if pop_res.returncode != 0: print("\n[!] Note: There were merge conflicts when reapplying your changes.") print(" The changes are still saved in your stash, but you will need to resolve conflicts in the files.") else: print("[*] Successfully carried changes over.") except Exception as inner_e: print(f"[!] Failed to carry changes: {inner_e}") elif opt == "2": try: print("\n[*] Stashing changes...") subprocess.run(["git", "stash", "push", "-m", f"Auto-stash before switching to {target}"], check=True) subprocess.run(["git", "checkout", target], check=True) print(f"[*] Successfully set changes aside and switched to '{target}'.") except Exception as inner_e: print(f"[!] Failed to stash and switch: {inner_e}") elif opt == "3": try: print("\n[*] Discarding tracked changes...") subprocess.run(["git", "checkout", "-f", target], check=True) print(f"[*] Successfully switched to '{target}'.") except Exception as inner_e: print(f"[!] Failed to force switch: {inner_e}") else: print("[*] Cancelled.") else: print(f"[!] Error switching branch:\n{e.stderr.strip() if e.stderr else str(e)}") except Exception as e: print(f"[!] Error: {e}") input("Press Enter...") def manage_git_stashes(): if not os.path.exists(".git"): print("[!] Not a git repository.") input("Press Enter...") return try: out = subprocess.check_output(["git", "stash", "list"], encoding='utf-8') except Exception as e: print(f"[!] Git error: {e}") input("Press Enter...") return stashes = [line for line in out.splitlines() if line.strip()] if not stashes: print("[*] No stashed changes found. You're all clean!") input("Press Enter...") return print("\n--- Saved Stashes ---") for i, s in enumerate(stashes): print(f"{i+1}. {s}") print("\nOptions:") print(" A. Apply a stash (Restore changes but keep the stash)") print(" P. Pop a stash (Restore changes and delete the stash)") print(" D. Delete a stash") print(" C. Cancel") choice = input("\nSelect action (A/P/D/C): ").strip().upper() if choice not in ["A", "P", "D"]: return idx_str = input(f"Enter stash number (1-{len(stashes)}): ").strip() if not idx_str.isdigit() or not (1 <= int(idx_str) <= len(stashes)): print("[!] Invalid stash number.") input("Press Enter...") return stash_ref = f"stash@{{{int(idx_str)-1}}}" try: if choice == "A": print(f"\n[*] Applying {stash_ref}...") subprocess.run(["git", "stash", "apply", stash_ref]) elif choice == "P": print(f"\n[*] Popping {stash_ref}...") subprocess.run(["git", "stash", "pop", stash_ref]) elif choice == "D": conf = input(f"Are you sure you want to delete {stash_ref}? (y/n): ").strip().lower() if conf == 'y': subprocess.run(["git", "stash", "drop", stash_ref]) print(f"[*] Dropped {stash_ref}.") except Exception as e: print(f"\n[!] Command returned an error or warning (you may need to resolve file conflicts manually).") input("\nPress Enter...") def do_manage(): manager = EnvsManager() while True: os.system('cls' if IS_WIN else 'clear') print("==========================================================================================") print(f"{'ENVIRONMENT MANAGER':^90}") print("==========================================================================================") envs = manager.list_envs() active = manager.get_active() keys = list(envs.keys()) if not envs: print(" No environments installed.") else: for name in keys: data = envs[name] status = "(Active)" if name == active else "" print(f" - {name:<15} [{data['type']}] {status}") print("------------------------------------------------------------------------------------------") print("1. Set Active Environment") print("2. Delete Environment") print("3. Add Existing Environment") print("4. List Environment Details") print("5. Open Terminal in Active Environment") print("6. Switch Git Branch") print("7. Manage Git Stashes") print("8. Exit") choice = input("\nSelect option: ") if choice == "1": if not keys: input("No environments to activate. Press Enter...") continue print("\nAvailable Environments:") for i, k in enumerate(keys): print(f" {i+1}. {k}") val = input("\nEnter name or number of environment to activate: ").strip() if val.isdigit() and 1 <= int(val) <= len(keys): name = keys[int(val)-1] else: name = val manager.set_active(name) input("Press Enter...") elif choice == "2": if not keys: input("No environments to delete. Press Enter...") continue print("\nAvailable Environments:") for i, k in enumerate(keys): print(f" {i+1}. {k}") val = input("\nEnter name or number of environment to DELETE: ").strip() if val.isdigit() and 1 <= int(val) <= len(keys): name = keys[int(val)-1] else: name = val if name in envs: conf = input(f"Are you sure you want to delete '{name}' and its files? (y/n): ") if conf.lower() == 'y': manager.remove_env(name) input("Deleted. Press Enter...") else: print(f"[!] Environment '{name}' not found.") input("Press Enter...") elif choice == "3": path = input("Enter the path to the existing environment folder: ").strip() if not os.path.exists(path): print("[!] Error: Path does not exist.") else: name = input("Enter a nickname for this environment: ").strip() if not name: name = os.path.basename(path.rstrip(os.sep)) print("\nSelect Environment Type:") print("1. venv") print("2. uv") print("3. conda") t_choice = input("Choice (Default 1): ") e_type = "uv" if t_choice == "2" else "conda" if t_choice == "3" else "venv" manager.add_env(name, e_type, os.path.abspath(path)) print(f"[*] Registered '{name}' at {os.path.abspath(path)}") input("Press Enter...") elif choice == "4": show_status() input("Press Enter...") elif choice == "5": open_terminal() elif choice == "6": switch_git_branch() elif choice == "7": manage_git_stashes() elif choice == "8": break def do_upgrade(config): manager = EnvsManager() print("\n" + "="*90) print(f"{'WAN2GP MANUAL COMPONENT UPGRADE':^90}") print("="*90) env_name = manager.resolve_target_env() env_data = manager.list_envs()[env_name] gpu_name, vendor = get_gpu_info() rec = config['gpu_profiles'][get_profile_key(gpu_name, vendor)] py_k = menu("Python Version", config['components']['python'], rec['python']) torch_k = menu("Torch Version", config['components']['torch'], rec['torch']) triton_k = menu("Triton", config['components']['triton'], rec['triton']) sage_k = menu("Sage Attention", config['components']['sage'], rec['sage']) sparge_k = menu("Sparge Attention", config['components']['sparge'], rec.get('sparge')) flash_k = menu("Flash Attention", config['components']['flash'], rec['flash']) install_logic(env_name, env_data['type'], env_data['path'], py_k, torch_k, triton_k, sage_k, sparge_k, flash_k, rec['kernels'], config) def get_system_specs(): ram_gb = 0 vram_gb = 0 if IS_WIN: try: out = subprocess.check_output( ["powershell", "-NoProfile", "-Command", "(Get-CimInstance Win32_ComputerSystem).TotalPhysicalMemory"], encoding='utf-8', stderr=subprocess.DEVNULL ).strip() if out: ram_gb = int(out) / (1024**3) except: try: out = subprocess.check_output( "wmic computersystem get TotalPhysicalMemory /value", shell=True, encoding='utf-8', stderr=subprocess.DEVNULL ) for line in out.splitlines(): if "TotalPhysicalMemory=" in line: ram_gb = int(line.split('=')[1]) / (1024**3) break except: pass else: try: with open('/proc/meminfo', 'r') as f: for line in f: if 'MemTotal' in line: kb_val = float(line.split()[1]) ram_gb = kb_val / (1024**2) break except: pass if ram_gb == 0: print("[!] Warning: Could not detect System RAM. Defaulting to 16GB.") ram_gb = 16 try: out = subprocess.check_output( ["nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader,nounits"], encoding='utf-8', stderr=subprocess.DEVNULL ).strip() vram_gb = float(out.split('\n')[0]) / 1024 except: print("[!] Warning: Could not detect VRAM via nvidia-smi. Defaulting to 8GB.") vram_gb = 8 return ram_gb, vram_gb def create_wgp_config(profile_key, config_data): WGP_CONFIG_FILE = "wgp_config.json" if os.path.exists(WGP_CONFIG_FILE): return print("\n[*] Auto-generating wgp_config.json based on hardware...") ram, vram = get_system_specs() print(f" Detected: {int(ram)}GB RAM / {int(vram)}GB VRAM") has_high_ram = ram > 60 has_mid_ram = ram > 30 has_huge_vram = vram > 22 has_high_vram = vram > 11 pid = 5 if has_high_ram and has_huge_vram: pid = 1 elif has_high_ram: pid = 2 elif has_mid_ram and has_huge_vram: pid = 3 elif has_mid_ram and has_high_vram: pid = 4 else: pid = 5 prof_settings = config_data['gpu_profiles'].get(profile_key, {}) attn_mode = prof_settings.get("attention", "") if not attn_mode: if "50" in profile_key or "40" in profile_key or "30" in profile_key: attn_mode = "sage2" elif "20" in profile_key: attn_mode = "sage" compile_mode = "" triton_key = prof_settings.get('triton') if triton_key and triton_key != "none": compile_mode = "transformer" config_out = { "attention_mode": attn_mode, "compile": compile_mode, "video_profile": pid, "image_profile": pid, "audio_profile": pid, } try: with open(WGP_CONFIG_FILE, 'w') as f: json.dump(config_out, f, indent=4) print(f" Created config with Profile {pid}, Attention: '{attn_mode}', Compile: '{compile_mode}'") except Exception as e: print(f"[!] Error writing config: {e}") def inject_system_paths(): if not IS_WIN: return paths = [] user = os.environ.get("USERPROFILE", "") local_app = os.environ.get("LOCALAPPDATA", "") appdata = os.environ.get("APPDATA", "") for base in [os.path.join(user, "Miniconda3"), os.path.join(user, "Anaconda3"), r"C:\ProgramData\Miniconda3"]: c_bin = os.path.join(base, "condabin") if os.path.exists(c_bin): paths.extend([c_bin, os.path.join(base, "Scripts"), base]) break if user: paths.append(os.path.join(user, ".local", "bin")) if appdata: paths.append(os.path.join(appdata, "uv", "bin")) if local_app: paths.extend([ os.path.join(local_app, "Programs", "Python", "PyManager"), os.path.join(local_app, "Programs", "Python", "Python311", "Scripts") ]) current_path = os.environ.get("PATH", "") for p in paths: if p and os.path.exists(p) and p not in current_path: current_path = f"{p};{current_path}" os.environ["PATH"] = current_path def repair_git_repo(): print("[*] Repairing WAN2GP repository...") if not os.path.exists(".git"): run_cmd("git init") try: subprocess.run(["git", "remote", "add", "origin", "https://github.com/deepbeepmeep/Wan2GP.git"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except: pass run_cmd("git fetch origin") try: subprocess.run(["git", "rev-parse", "--verify", "origin/main"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) default_branch = "main" except subprocess.CalledProcessError: default_branch = "master" print(f"[*] Force resetting local files to match origin/{default_branch}...") run_cmd(f"git reset --hard origin/{default_branch}") run_cmd(f"git branch -M {default_branch}") run_cmd(f"git branch --set-upstream-to=origin/{default_branch} {default_branch}") if __name__ == "__main__": inject_system_paths() parser = argparse.ArgumentParser() parser.add_argument("mode", choices=["install", "update", "upgrade", "status", "manage", "get_env_info"]) parser.add_argument("--env", default="venv", help="Type of env for install (venv, uv, conda, none)") parser.add_argument("--auto", action="store_true", help="Run 1-click automatic install") args = parser.parse_args() cfg = load_config() if args.mode == "get_env_info": manager = EnvsManager() active = manager.get_active() if not active or not manager.list_envs().get(active): sys.exit(1) env_data = manager.list_envs()[active] print(f"ENV_INFO|{env_data['type']}|{env_data['path']}") sys.exit(0) if args.mode == "status": show_status() sys.exit(0) if args.mode == "manage": do_manage() sys.exit(0) gpu_name, vendor = get_gpu_info() profile_key = get_profile_key(gpu_name, vendor) profile = cfg['gpu_profiles'][profile_key] if args.mode == "install": print(f"Hardware Detected: {gpu_name} ({vendor})") if args.auto: do_install_auto(args.env, cfg, profile_key) else: do_install_interactive(args.env, cfg, profile_key) elif args.mode == "update": manager = EnvsManager() env_name = manager.resolve_target_env() env_data = manager.list_envs()[env_name] needs_install = False if not os.path.exists(".git"): print("[*] No .git folder found.") repair_git_repo() needs_install = True else: print("[*] Checking for updates...") try: old_head = subprocess.check_output(["git", "rev-parse", "HEAD"], encoding='utf-8', stderr=subprocess.DEVNULL).strip() except: old_head = "" try: subprocess.run(["git", "pull"], check=True) new_head = subprocess.check_output(["git", "rev-parse", "HEAD"], encoding='utf-8', stderr=subprocess.DEVNULL).strip() if old_head != new_head or not old_head: needs_install = True except subprocess.CalledProcessError: print("\n[!] 'git pull' failed.") print("[*] Attempting automatic recovery...") repair_git_repo() needs_install = True if needs_install: print("\n[*] Updates found. Installing/Verifying requirements...") install_fmt = ENV_TEMPLATES[env_data['type']]['install'] pip_cmd = install_fmt.format(dir=env_data['path']) run_cmd(f"{pip_cmd} -r requirements.txt") else: print("\n[*] Code is already up to date. Skipping requirements installation.") elif args.mode == "upgrade": do_upgrade(cfg)