#!/usr/bin/env python3 """ Advanced GPU Fan Controller Provides sophisticated fan control with multiple profiles, safety features, and comprehensive logging. Supports temperature-based curves, manual override, and automatic fallback modes. """ import time import os import sys import json import logging import signal import argparse from typing import Dict, List, Optional, Callable from dataclasses import dataclass, asdict from enum import Enum import threading from pathlib import Path from gpu_monitoring import GPUManager, GPUStatus logger = logging.getLogger(__name__) class FanMode(Enum): """Fan control modes.""" AUTO = "auto" MANUAL = "manual" OFF = "off" EMERGENCY = "emergency" class ProfileType(Enum): """Types of fan control profiles.""" SILENT = "silent" BALANCED = "balanced" PERFORMANCE = "performance" CUSTOM = "custom" @dataclass class FanProfile: """Fan control profile configuration.""" name: str profile_type: ProfileType description: str curve: Dict[str, float] safety: Dict[str, float] enabled: bool = True def __post_init__(self): # Validate curve parameters required_curve_keys = ['min_temp', 'max_temp', 'min_pwm', 'max_pwm'] for key in required_curve_keys: if key not in self.curve: raise ValueError(f"Missing required curve parameter: {key}") # Validate safety parameters required_safety_keys = ['emergency_temp', 'emergency_pwm', 'max_fan_time'] for key in required_safety_keys: if key not in self.safety: raise ValueError(f"Missing required safety parameter: {key}") @dataclass class FanStatus: """Current fan status.""" mode: FanMode profile: str current_pwm: int target_pwm: int temperature: float last_update: float manual_override: bool = False emergency_mode: bool = False class FanController: """Advanced GPU fan controller with multiple profiles and safety features.""" def __init__(self, config_file: str = "config/fan_profiles.json"): self.config_file = config_file self.profiles = {} self.current_profile = None self.current_mode = FanMode.AUTO self.manual_pwm = 0 self.running = False self.lock = threading.Lock() # GPU management self.gpu_manager = GPUManager() self.gpu_name = None # Status tracking self.status = None self.last_status_update = 0 # Safety features self.emergency_temp = 85.0 self.emergency_pwm = 255 self.max_fan_time = 300 # 5 minutes self.fan_on_time = 0 # Configuration self.update_interval = 2.0 self.log_interval = 30.0 self.last_log_time = 0 # Callbacks self.status_callbacks = [] # Load configuration self.load_profiles() def load_profiles(self): """Load fan control profiles from configuration file.""" try: if os.path.exists(self.config_file): with open(self.config_file, 'r') as f: config_data = json.load(f) for profile_name, profile_data in config_data.items(): profile = FanProfile( name=profile_data['name'], profile_type=ProfileType(profile_data['profile_type']), description=profile_data['description'], curve=profile_data['curve'], safety=profile_data['safety'], enabled=profile_data.get('enabled', True) ) self.profiles[profile_name] = profile logger.info(f"Loaded {len(self.profiles)} fan profiles") # Set default profile if self.profiles: default_profile = next(iter(self.profiles.values())) self.set_profile(default_profile.name) logger.info(f"Set default profile: {default_profile.name}") else: # Create default profiles self.create_default_profiles() self.save_profiles() except Exception as e: logger.error(f"Error loading profiles: {e}") self.create_default_profiles() def create_default_profiles(self): """Create default fan control profiles.""" self.profiles = { "silent": FanProfile( name="Silent", profile_type=ProfileType.SILENT, description="Quiet operation with lower fan speeds", curve={ "min_temp": 40.0, "max_temp": 65.0, "min_pwm": 120, "max_pwm": 220 }, safety={ "emergency_temp": 85.0, "emergency_pwm": 255, "max_fan_time": 300 } ), "balanced": FanProfile( name="Balanced", profile_type=ProfileType.BALANCED, description="Balanced performance and noise", curve={ "min_temp": 38.0, "max_temp": 60.0, "min_pwm": 155, "max_pwm": 255 }, safety={ "emergency_temp": 80.0, "emergency_pwm": 255, "max_fan_time": 300 } ), "performance": FanProfile( name="Performance", profile_type=ProfileType.PERFORMANCE, description="Maximum cooling for high performance", curve={ "min_temp": 35.0, "max_temp": 55.0, "min_pwm": 180, "max_pwm": 255 }, safety={ "emergency_temp": 75.0, "emergency_pwm": 255, "max_fan_time": 300 } ) } logger.info("Created default fan profiles") def save_profiles(self): """Save current profiles to configuration file.""" try: os.makedirs(os.path.dirname(self.config_file), exist_ok=True) config_data = {} for name, profile in self.profiles.items(): config_data[name] = { 'name': profile.name, 'profile_type': profile.profile_type.value, 'description': profile.description, 'curve': profile.curve, 'safety': profile.safety, 'enabled': profile.enabled } with open(self.config_file, 'w') as f: json.dump(config_data, f, indent=2) logger.info("Saved fan profiles to configuration file") except Exception as e: logger.error(f"Error saving profiles: {e}") def initialize(self) -> bool: """Initialize the fan controller.""" logger.info("Initializing fan controller...") # Initialize GPU manager if not self.gpu_manager.initialize(): logger.error("Failed to initialize GPU manager") return False # Get first GPU gpus = self.gpu_manager.get_gpu_list() if not gpus: logger.error("No GPUs detected") return False self.gpu_name = gpus[0] logger.info(f"Using GPU: {self.gpu_name}") # Check permissions if not self.check_permissions(): logger.error("Insufficient permissions for fan control") return False # Initialize fan self.set_fan_mode(FanMode.AUTO) self.set_pwm(0) logger.info("Fan controller initialized successfully") return True def check_permissions(self) -> bool: """Check if we have write permissions to fan control files.""" try: gpu_info = self.gpu_manager.get_gpu_info(self.gpu_name) if not gpu_info: return False hwmon_path = gpu_info[0]['hwmon_path'] pwm_file = os.path.join(hwmon_path, "pwm1") pwm_enable = os.path.join(hwmon_path, "pwm1_enable") # Test write permissions with open(pwm_enable, 'w') as f: f.write('1') with open(pwm_file, 'w') as f: f.write('0') return True except Exception as e: logger.debug(f"Permission check failed: {e}") return False def set_profile(self, profile_name: str) -> bool: """Set the current fan control profile.""" with self.lock: if profile_name not in self.profiles: logger.error(f"Profile '{profile_name}' not found") return False profile = self.profiles[profile_name] if not profile.enabled: logger.error(f"Profile '{profile_name}' is disabled") return False self.current_profile = profile logger.info(f"Switched to profile: {profile.name}") return True def set_mode(self, mode: FanMode): """Set the fan control mode.""" with self.lock: self.current_mode = mode logger.info(f"Set fan mode to: {mode.value}") def set_manual_pwm(self, pwm: int): """Set manual PWM value (0-255).""" with self.lock: pwm = max(0, min(255, pwm)) # Clamp to valid range self.manual_pwm = pwm self.set_mode(FanMode.MANUAL) logger.info(f"Set manual PWM to: {pwm}") def set_fan_mode(self, mode: FanMode): """Set fan mode and enable/disable fan control.""" try: gpu_info = self.gpu_manager.get_gpu_info(self.gpu_name) if not gpu_info: return False hwmon_path = gpu_info[0]['hwmon_path'] fan_enable = os.path.join(hwmon_path, "fan1_enable") pwm_enable = os.path.join(hwmon_path, "pwm1_enable") if mode == FanMode.OFF: with open(fan_enable, 'w') as f: f.write('0') with open(pwm_enable, 'w') as f: f.write('0') else: with open(fan_enable, 'w') as f: f.write('1') with open(pwm_enable, 'w') as f: f.write('1') return True except Exception as e: logger.error(f"Error setting fan mode: {e}") return False def set_pwm(self, pwm: int): """Set PWM value (0-255).""" try: gpu_info = self.gpu_manager.get_gpu_info(self.gpu_name) if not gpu_info: return False hwmon_path = gpu_info[0]['hwmon_path'] pwm_file = os.path.join(hwmon_path, "pwm1") pwm = max(0, min(255, pwm)) # Clamp to valid range with open(pwm_file, 'w') as f: f.write(str(int(pwm))) return True except Exception as e: logger.error(f"Error setting PWM: {e}") return False def calculate_target_pwm(self, temperature: float) -> int: """Calculate target PWM based on temperature and current profile.""" if not self.current_profile: return 0 curve = self.current_profile.curve safety = self.current_profile.safety # Emergency temperature handling if temperature >= safety['emergency_temp']: return int(safety['emergency_pwm']) # Temperature-based curve calculation min_temp = curve['min_temp'] max_temp = curve['max_temp'] min_pwm = curve['min_pwm'] max_pwm = curve['max_pwm'] if temperature <= min_temp: return int(min_pwm) elif temperature >= max_temp: return int(max_pwm) else: # Linear interpolation temp_range = max_temp - min_temp pwm_range = max_pwm - min_pwm return int(min_pwm + ((temperature - min_temp) / temp_range) * pwm_range) def check_safety_limits(self, temperature: float, current_pwm: int) -> bool: """Check if safety limits are exceeded.""" if not self.current_profile: return False safety = self.current_profile.safety # Emergency temperature check if temperature >= safety['emergency_temp']: return True # Maximum fan time check if current_pwm >= 250: # High fan speed threshold self.fan_on_time += self.update_interval if self.fan_on_time >= safety['max_fan_time']: logger.warning(f"Fan has been at high speed for {safety['max_fan_time']} seconds") return True else: self.fan_on_time = 0 return False def update_fan_control(self): """Update fan control based on current conditions.""" try: # Get current GPU status status_dict = self.gpu_manager.get_status(self.gpu_name) gpu_status = status_dict.get(self.gpu_name) if not gpu_status: logger.warning("Failed to get GPU status") return False temperature = gpu_status.temperature current_time = time.time() # Calculate target PWM target_pwm = 0 emergency_mode = False with self.lock: if self.current_mode == FanMode.MANUAL: target_pwm = self.manual_pwm elif self.current_mode == FanMode.OFF: target_pwm = 0 else: # AUTO mode target_pwm = self.calculate_target_pwm(temperature) # Check safety limits if self.check_safety_limits(temperature, target_pwm): target_pwm = int(self.current_profile.safety['emergency_pwm']) emergency_mode = True self.current_mode = FanMode.EMERGENCY # Apply PWM if self.set_pwm(target_pwm): # Update status self.status = FanStatus( mode=self.current_mode, profile=self.current_profile.name if self.current_profile else "unknown", current_pwm=target_pwm, target_pwm=target_pwm, temperature=temperature, last_update=current_time, manual_override=(self.current_mode == FanMode.MANUAL), emergency_mode=emergency_mode ) # Log status periodically if current_time - self.last_log_time >= self.log_interval: pwm_percent = int(target_pwm * 100 / 255) logger.info(f"Temp: {temperature:.1f}°C | PWM: {target_pwm} ({pwm_percent}%) | Mode: {self.current_mode.value}") self.last_log_time = current_time # Notify callbacks self._notify_status_callbacks() return True except Exception as e: logger.error(f"Error updating fan control: {e}") return False def add_status_callback(self, callback: Callable[[FanStatus], None]): """Add a callback function to be called when status updates.""" self.status_callbacks.append(callback) def _notify_status_callbacks(self): """Notify all registered status callbacks.""" if self.status: for callback in self.status_callbacks: try: callback(self.status) except Exception as e: logger.error(f"Error in status callback: {e}") def run(self): """Main control loop.""" logger.info("Starting fan controller...") self.running = True try: while self.running: self.update_fan_control() time.sleep(self.update_interval) except KeyboardInterrupt: logger.info("Stopping fan controller...") self.running = False except Exception as e: logger.error(f"Fatal error in fan controller: {e}") self.running = False def stop(self): """Stop the fan controller.""" logger.info("Stopping fan controller...") self.running = False # Set fan to safe state self.set_mode(FanMode.OFF) self.set_pwm(0) def get_status(self) -> Optional[FanStatus]: """Get current fan status.""" return self.status def get_profiles(self) -> Dict[str, FanProfile]: """Get all available profiles.""" return self.profiles.copy() def add_profile(self, profile: FanProfile): """Add a new fan profile.""" with self.lock: self.profiles[profile.name] = profile self.save_profiles() logger.info(f"Added profile: {profile.name}") def remove_profile(self, profile_name: str): """Remove a fan profile.""" with self.lock: if profile_name in self.profiles: del self.profiles[profile_name] self.save_profiles() logger.info(f"Removed profile: {profile_name}") class FanControllerCLI: """Command-line interface for fan controller.""" def __init__(self): self.controller = None def setup_logging(self, log_level: str): """Setup logging configuration.""" numeric_level = getattr(logging, log_level.upper(), logging.INFO) logging.basicConfig( level=numeric_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/gpu_fan_control.log'), logging.StreamHandler(sys.stdout) ] ) def run(self): """Run the fan controller with command-line arguments.""" parser = argparse.ArgumentParser(description='Advanced GPU Fan Controller') parser.add_argument('--profile', type=str, help='Fan profile to use') parser.add_argument('--manual-pwm', type=int, choices=range(0, 256), help='Manual PWM value (0-255)') parser.add_argument('--config', type=str, default='config/fan_profiles.json', help='Configuration file path') parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', help='Log level') parser.add_argument('--list-profiles', action='store_true', help='List available profiles') parser.add_argument('--daemon', action='store_true', help='Run as daemon') args = parser.parse_args() # Setup logging self.setup_logging(args.log_level) # Initialize controller self.controller = FanController(args.config) if args.list_profiles: self.list_profiles() return if not self.controller.initialize(): logger.error("Failed to initialize fan controller") sys.exit(1) # Apply command-line settings if args.profile: if not self.controller.set_profile(args.profile): logger.error(f"Failed to set profile: {args.profile}") sys.exit(1) if args.manual_pwm is not None: self.controller.set_manual_pwm(args.manual_pwm) # Setup signal handlers signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) # Run controller if args.daemon: logger.info("Running as daemon...") self.controller.run() else: try: self.controller.run() except KeyboardInterrupt: logger.info("Received interrupt signal") def list_profiles(self): """List available fan profiles.""" controller = FanController() controller.load_profiles() print("Available fan profiles:") for name, profile in controller.profiles.items(): status = "✓" if profile.enabled else "✗" print(f" {status} {name}: {profile.description}") def signal_handler(self, signum, frame): """Handle shutdown signals.""" logger.info(f"Received signal {signum}, shutting down...") if self.controller: self.controller.stop() sys.exit(0) if __name__ == "__main__": cli = FanControllerCLI() cli.run()