| |
| """ |
| Advanced GPU Fan Controller |
| |
| Provides sophisticated fan control with multiple profiles, safety features, |
| and comprehensive logging. Supports temperature-based curves, manual override, |
| and automatic fallback modes. |
| """ |
|
|
| import time |
| import os |
| import sys |
| import json |
| import logging |
| import signal |
| import argparse |
| from typing import Dict, List, Optional, Callable |
| from dataclasses import dataclass, asdict |
| from enum import Enum |
| import threading |
| from pathlib import Path |
|
|
| from gpu_monitoring import GPUManager, GPUStatus |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class FanMode(Enum): |
| """Fan control modes.""" |
| AUTO = "auto" |
| MANUAL = "manual" |
| OFF = "off" |
| EMERGENCY = "emergency" |
|
|
|
|
| class ProfileType(Enum): |
| """Types of fan control profiles.""" |
| SILENT = "silent" |
| BALANCED = "balanced" |
| PERFORMANCE = "performance" |
| CUSTOM = "custom" |
|
|
|
|
| @dataclass |
| class FanProfile: |
| """Fan control profile configuration.""" |
| name: str |
| profile_type: ProfileType |
| description: str |
| curve: Dict[str, float] |
| safety: Dict[str, float] |
| enabled: bool = True |
| |
| def __post_init__(self): |
| |
| required_curve_keys = ['min_temp', 'max_temp', 'min_pwm', 'max_pwm'] |
| for key in required_curve_keys: |
| if key not in self.curve: |
| raise ValueError(f"Missing required curve parameter: {key}") |
| |
| |
| required_safety_keys = ['emergency_temp', 'emergency_pwm', 'max_fan_time'] |
| for key in required_safety_keys: |
| if key not in self.safety: |
| raise ValueError(f"Missing required safety parameter: {key}") |
|
|
|
|
| @dataclass |
| class FanStatus: |
| """Current fan status.""" |
| mode: FanMode |
| profile: str |
| current_pwm: int |
| target_pwm: int |
| temperature: float |
| last_update: float |
| manual_override: bool = False |
| emergency_mode: bool = False |
|
|
|
|
| class FanController: |
| """Advanced GPU fan controller with multiple profiles and safety features.""" |
| |
| def __init__(self, config_file: str = "config/fan_profiles.json"): |
| self.config_file = config_file |
| self.profiles = {} |
| self.current_profile = None |
| self.current_mode = FanMode.AUTO |
| self.manual_pwm = 0 |
| self.running = False |
| self.lock = threading.Lock() |
| |
| |
| self.gpu_manager = GPUManager() |
| self.gpu_name = None |
| |
| |
| self.status = None |
| self.last_status_update = 0 |
| |
| |
| self.emergency_temp = 85.0 |
| self.emergency_pwm = 255 |
| self.max_fan_time = 300 |
| self.fan_on_time = 0 |
| |
| |
| self.update_interval = 2.0 |
| self.log_interval = 30.0 |
| self.last_log_time = 0 |
| |
| |
| self.status_callbacks = [] |
| |
| |
| self.load_profiles() |
| |
| def load_profiles(self): |
| """Load fan control profiles from configuration file.""" |
| try: |
| if os.path.exists(self.config_file): |
| with open(self.config_file, 'r') as f: |
| config_data = json.load(f) |
| |
| for profile_name, profile_data in config_data.items(): |
| profile = FanProfile( |
| name=profile_data['name'], |
| profile_type=ProfileType(profile_data['profile_type']), |
| description=profile_data['description'], |
| curve=profile_data['curve'], |
| safety=profile_data['safety'], |
| enabled=profile_data.get('enabled', True) |
| ) |
| self.profiles[profile_name] = profile |
| |
| logger.info(f"Loaded {len(self.profiles)} fan profiles") |
| |
| |
| if self.profiles: |
| default_profile = next(iter(self.profiles.values())) |
| self.set_profile(default_profile.name) |
| logger.info(f"Set default profile: {default_profile.name}") |
| |
| else: |
| |
| self.create_default_profiles() |
| self.save_profiles() |
| |
| except Exception as e: |
| logger.error(f"Error loading profiles: {e}") |
| self.create_default_profiles() |
| |
| def create_default_profiles(self): |
| """Create default fan control profiles.""" |
| self.profiles = { |
| "silent": FanProfile( |
| name="Silent", |
| profile_type=ProfileType.SILENT, |
| description="Quiet operation with lower fan speeds", |
| curve={ |
| "min_temp": 40.0, |
| "max_temp": 65.0, |
| "min_pwm": 120, |
| "max_pwm": 220 |
| }, |
| safety={ |
| "emergency_temp": 85.0, |
| "emergency_pwm": 255, |
| "max_fan_time": 300 |
| } |
| ), |
| "balanced": FanProfile( |
| name="Balanced", |
| profile_type=ProfileType.BALANCED, |
| description="Balanced performance and noise", |
| curve={ |
| "min_temp": 38.0, |
| "max_temp": 60.0, |
| "min_pwm": 155, |
| "max_pwm": 255 |
| }, |
| safety={ |
| "emergency_temp": 80.0, |
| "emergency_pwm": 255, |
| "max_fan_time": 300 |
| } |
| ), |
| "performance": FanProfile( |
| name="Performance", |
| profile_type=ProfileType.PERFORMANCE, |
| description="Maximum cooling for high performance", |
| curve={ |
| "min_temp": 35.0, |
| "max_temp": 55.0, |
| "min_pwm": 180, |
| "max_pwm": 255 |
| }, |
| safety={ |
| "emergency_temp": 75.0, |
| "emergency_pwm": 255, |
| "max_fan_time": 300 |
| } |
| ) |
| } |
| |
| logger.info("Created default fan profiles") |
| |
| def save_profiles(self): |
| """Save current profiles to configuration file.""" |
| try: |
| os.makedirs(os.path.dirname(self.config_file), exist_ok=True) |
| |
| config_data = {} |
| for name, profile in self.profiles.items(): |
| config_data[name] = { |
| 'name': profile.name, |
| 'profile_type': profile.profile_type.value, |
| 'description': profile.description, |
| 'curve': profile.curve, |
| 'safety': profile.safety, |
| 'enabled': profile.enabled |
| } |
| |
| with open(self.config_file, 'w') as f: |
| json.dump(config_data, f, indent=2) |
| |
| logger.info("Saved fan profiles to configuration file") |
| |
| except Exception as e: |
| logger.error(f"Error saving profiles: {e}") |
| |
| def initialize(self) -> bool: |
| """Initialize the fan controller.""" |
| logger.info("Initializing fan controller...") |
| |
| |
| if not self.gpu_manager.initialize(): |
| logger.error("Failed to initialize GPU manager") |
| return False |
| |
| |
| gpus = self.gpu_manager.get_gpu_list() |
| if not gpus: |
| logger.error("No GPUs detected") |
| return False |
| |
| self.gpu_name = gpus[0] |
| logger.info(f"Using GPU: {self.gpu_name}") |
| |
| |
| if not self.check_permissions(): |
| logger.error("Insufficient permissions for fan control") |
| return False |
| |
| |
| self.set_fan_mode(FanMode.AUTO) |
| self.set_pwm(0) |
| |
| logger.info("Fan controller initialized successfully") |
| return True |
| |
| def check_permissions(self) -> bool: |
| """Check if we have write permissions to fan control files.""" |
| try: |
| gpu_info = self.gpu_manager.get_gpu_info(self.gpu_name) |
| if not gpu_info: |
| return False |
| |
| hwmon_path = gpu_info[0]['hwmon_path'] |
| pwm_file = os.path.join(hwmon_path, "pwm1") |
| pwm_enable = os.path.join(hwmon_path, "pwm1_enable") |
| |
| |
| with open(pwm_enable, 'w') as f: |
| f.write('1') |
| with open(pwm_file, 'w') as f: |
| f.write('0') |
| |
| return True |
| |
| except Exception as e: |
| logger.debug(f"Permission check failed: {e}") |
| return False |
| |
| def set_profile(self, profile_name: str) -> bool: |
| """Set the current fan control profile.""" |
| with self.lock: |
| if profile_name not in self.profiles: |
| logger.error(f"Profile '{profile_name}' not found") |
| return False |
| |
| profile = self.profiles[profile_name] |
| if not profile.enabled: |
| logger.error(f"Profile '{profile_name}' is disabled") |
| return False |
| |
| self.current_profile = profile |
| logger.info(f"Switched to profile: {profile.name}") |
| return True |
| |
| def set_mode(self, mode: FanMode): |
| """Set the fan control mode.""" |
| with self.lock: |
| self.current_mode = mode |
| logger.info(f"Set fan mode to: {mode.value}") |
| |
| def set_manual_pwm(self, pwm: int): |
| """Set manual PWM value (0-255).""" |
| with self.lock: |
| pwm = max(0, min(255, pwm)) |
| self.manual_pwm = pwm |
| self.set_mode(FanMode.MANUAL) |
| logger.info(f"Set manual PWM to: {pwm}") |
| |
| def set_fan_mode(self, mode: FanMode): |
| """Set fan mode and enable/disable fan control.""" |
| try: |
| gpu_info = self.gpu_manager.get_gpu_info(self.gpu_name) |
| if not gpu_info: |
| return False |
| |
| hwmon_path = gpu_info[0]['hwmon_path'] |
| fan_enable = os.path.join(hwmon_path, "fan1_enable") |
| pwm_enable = os.path.join(hwmon_path, "pwm1_enable") |
| |
| if mode == FanMode.OFF: |
| with open(fan_enable, 'w') as f: |
| f.write('0') |
| with open(pwm_enable, 'w') as f: |
| f.write('0') |
| else: |
| with open(fan_enable, 'w') as f: |
| f.write('1') |
| with open(pwm_enable, 'w') as f: |
| f.write('1') |
| |
| return True |
| |
| except Exception as e: |
| logger.error(f"Error setting fan mode: {e}") |
| return False |
| |
| def set_pwm(self, pwm: int): |
| """Set PWM value (0-255).""" |
| try: |
| gpu_info = self.gpu_manager.get_gpu_info(self.gpu_name) |
| if not gpu_info: |
| return False |
| |
| hwmon_path = gpu_info[0]['hwmon_path'] |
| pwm_file = os.path.join(hwmon_path, "pwm1") |
| |
| pwm = max(0, min(255, pwm)) |
| |
| with open(pwm_file, 'w') as f: |
| f.write(str(int(pwm))) |
| |
| return True |
| |
| except Exception as e: |
| logger.error(f"Error setting PWM: {e}") |
| return False |
| |
| def calculate_target_pwm(self, temperature: float) -> int: |
| """Calculate target PWM based on temperature and current profile.""" |
| if not self.current_profile: |
| return 0 |
| |
| curve = self.current_profile.curve |
| safety = self.current_profile.safety |
| |
| |
| if temperature >= safety['emergency_temp']: |
| return int(safety['emergency_pwm']) |
| |
| |
| min_temp = curve['min_temp'] |
| max_temp = curve['max_temp'] |
| min_pwm = curve['min_pwm'] |
| max_pwm = curve['max_pwm'] |
| |
| if temperature <= min_temp: |
| return int(min_pwm) |
| elif temperature >= max_temp: |
| return int(max_pwm) |
| else: |
| |
| temp_range = max_temp - min_temp |
| pwm_range = max_pwm - min_pwm |
| return int(min_pwm + ((temperature - min_temp) / temp_range) * pwm_range) |
| |
| def check_safety_limits(self, temperature: float, current_pwm: int) -> bool: |
| """Check if safety limits are exceeded.""" |
| if not self.current_profile: |
| return False |
| |
| safety = self.current_profile.safety |
| |
| |
| if temperature >= safety['emergency_temp']: |
| return True |
| |
| |
| if current_pwm >= 250: |
| self.fan_on_time += self.update_interval |
| if self.fan_on_time >= safety['max_fan_time']: |
| logger.warning(f"Fan has been at high speed for {safety['max_fan_time']} seconds") |
| return True |
| else: |
| self.fan_on_time = 0 |
| |
| return False |
| |
| def update_fan_control(self): |
| """Update fan control based on current conditions.""" |
| try: |
| |
| status_dict = self.gpu_manager.get_status(self.gpu_name) |
| gpu_status = status_dict.get(self.gpu_name) |
| |
| if not gpu_status: |
| logger.warning("Failed to get GPU status") |
| return False |
| |
| temperature = gpu_status.temperature |
| current_time = time.time() |
| |
| |
| target_pwm = 0 |
| emergency_mode = False |
| |
| with self.lock: |
| if self.current_mode == FanMode.MANUAL: |
| target_pwm = self.manual_pwm |
| elif self.current_mode == FanMode.OFF: |
| target_pwm = 0 |
| else: |
| target_pwm = self.calculate_target_pwm(temperature) |
| |
| |
| if self.check_safety_limits(temperature, target_pwm): |
| target_pwm = int(self.current_profile.safety['emergency_pwm']) |
| emergency_mode = True |
| self.current_mode = FanMode.EMERGENCY |
| |
| |
| if self.set_pwm(target_pwm): |
| |
| self.status = FanStatus( |
| mode=self.current_mode, |
| profile=self.current_profile.name if self.current_profile else "unknown", |
| current_pwm=target_pwm, |
| target_pwm=target_pwm, |
| temperature=temperature, |
| last_update=current_time, |
| manual_override=(self.current_mode == FanMode.MANUAL), |
| emergency_mode=emergency_mode |
| ) |
| |
| |
| if current_time - self.last_log_time >= self.log_interval: |
| pwm_percent = int(target_pwm * 100 / 255) |
| logger.info(f"Temp: {temperature:.1f}°C | PWM: {target_pwm} ({pwm_percent}%) | Mode: {self.current_mode.value}") |
| self.last_log_time = current_time |
| |
| |
| self._notify_status_callbacks() |
| |
| return True |
| |
| except Exception as e: |
| logger.error(f"Error updating fan control: {e}") |
| |
| return False |
| |
| def add_status_callback(self, callback: Callable[[FanStatus], None]): |
| """Add a callback function to be called when status updates.""" |
| self.status_callbacks.append(callback) |
| |
| def _notify_status_callbacks(self): |
| """Notify all registered status callbacks.""" |
| if self.status: |
| for callback in self.status_callbacks: |
| try: |
| callback(self.status) |
| except Exception as e: |
| logger.error(f"Error in status callback: {e}") |
| |
| def run(self): |
| """Main control loop.""" |
| logger.info("Starting fan controller...") |
| self.running = True |
| |
| try: |
| while self.running: |
| self.update_fan_control() |
| time.sleep(self.update_interval) |
| |
| except KeyboardInterrupt: |
| logger.info("Stopping fan controller...") |
| self.running = False |
| except Exception as e: |
| logger.error(f"Fatal error in fan controller: {e}") |
| self.running = False |
| |
| def stop(self): |
| """Stop the fan controller.""" |
| logger.info("Stopping fan controller...") |
| self.running = False |
| |
| |
| self.set_mode(FanMode.OFF) |
| self.set_pwm(0) |
| |
| def get_status(self) -> Optional[FanStatus]: |
| """Get current fan status.""" |
| return self.status |
| |
| def get_profiles(self) -> Dict[str, FanProfile]: |
| """Get all available profiles.""" |
| return self.profiles.copy() |
| |
| def add_profile(self, profile: FanProfile): |
| """Add a new fan profile.""" |
| with self.lock: |
| self.profiles[profile.name] = profile |
| self.save_profiles() |
| logger.info(f"Added profile: {profile.name}") |
| |
| def remove_profile(self, profile_name: str): |
| """Remove a fan profile.""" |
| with self.lock: |
| if profile_name in self.profiles: |
| del self.profiles[profile_name] |
| self.save_profiles() |
| logger.info(f"Removed profile: {profile_name}") |
|
|
|
|
| class FanControllerCLI: |
| """Command-line interface for fan controller.""" |
| |
| def __init__(self): |
| self.controller = None |
| |
| def setup_logging(self, log_level: str): |
| """Setup logging configuration.""" |
| numeric_level = getattr(logging, log_level.upper(), logging.INFO) |
| |
| logging.basicConfig( |
| level=numeric_level, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| handlers=[ |
| logging.FileHandler('/var/log/gpu_fan_control.log'), |
| logging.StreamHandler(sys.stdout) |
| ] |
| ) |
| |
| def run(self): |
| """Run the fan controller with command-line arguments.""" |
| parser = argparse.ArgumentParser(description='Advanced GPU Fan Controller') |
| parser.add_argument('--profile', type=str, help='Fan profile to use') |
| parser.add_argument('--manual-pwm', type=int, choices=range(0, 256), help='Manual PWM value (0-255)') |
| parser.add_argument('--config', type=str, default='config/fan_profiles.json', help='Configuration file path') |
| parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', help='Log level') |
| parser.add_argument('--list-profiles', action='store_true', help='List available profiles') |
| parser.add_argument('--daemon', action='store_true', help='Run as daemon') |
| |
| args = parser.parse_args() |
| |
| |
| self.setup_logging(args.log_level) |
| |
| |
| self.controller = FanController(args.config) |
| |
| if args.list_profiles: |
| self.list_profiles() |
| return |
| |
| if not self.controller.initialize(): |
| logger.error("Failed to initialize fan controller") |
| sys.exit(1) |
| |
| |
| if args.profile: |
| if not self.controller.set_profile(args.profile): |
| logger.error(f"Failed to set profile: {args.profile}") |
| sys.exit(1) |
| |
| if args.manual_pwm is not None: |
| self.controller.set_manual_pwm(args.manual_pwm) |
| |
| |
| signal.signal(signal.SIGINT, self.signal_handler) |
| signal.signal(signal.SIGTERM, self.signal_handler) |
| |
| |
| if args.daemon: |
| logger.info("Running as daemon...") |
| self.controller.run() |
| else: |
| try: |
| self.controller.run() |
| except KeyboardInterrupt: |
| logger.info("Received interrupt signal") |
| |
| def list_profiles(self): |
| """List available fan profiles.""" |
| controller = FanController() |
| controller.load_profiles() |
| |
| print("Available fan profiles:") |
| for name, profile in controller.profiles.items(): |
| status = "✓" if profile.enabled else "✗" |
| print(f" {status} {name}: {profile.description}") |
| |
| def signal_handler(self, signum, frame): |
| """Handle shutdown signals.""" |
| logger.info(f"Received signal {signum}, shutting down...") |
| if self.controller: |
| self.controller.stop() |
| sys.exit(0) |
|
|
|
|
| if __name__ == "__main__": |
| cli = FanControllerCLI() |
| cli.run() |