| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import os |
| import os.path as osp |
|
|
| from huggingface_hub import repo_exists, snapshot_download |
| from huggingface_hub.utils import HFValidationError, validate_repo_id |
| from transformers import AutoConfig, AutoTokenizer, PretrainedConfig |
|
|
| from .configuration_vila import VILAConfig |
| from .constants import MEDIA_TOKENS |
| from .tokenizer_utils import infer_stop_tokens |
|
|
|
|
| def load_tokenizer_then_handle_media_tokens_and_chat_template( |
| model_name_or_path, config: VILAConfig, model_max_length=None |
| ): |
| tokenizer = AutoTokenizer.from_pretrained( |
| osp.join(model_name_or_path, "llm"), padding_side="right", use_fast=True, legacy=False |
| ) |
| if model_max_length is not None: |
| tokenizer.model_max_length = model_max_length |
|
|
| |
| if getattr(config, "chat_template", None) is not None: |
| print(f"Using chat template: {config.chat_template}") |
| fpath = os.path.join(os.path.dirname(__file__), "chat_templates", f"{config.chat_template}.jinja") |
| if not os.path.exists(fpath): |
| fpath = os.path.join(model_name_or_path, f"{config.chat_template}.jinja") |
| with open(fpath) as fd: |
| chat_template = fd.read() |
| tokenizer.chat_template = chat_template.replace(" ", "").replace("\n", "") |
|
|
| |
| tokenizer.stop_tokens = infer_stop_tokens(tokenizer) |
| tokenizer.stop_token_ids = tokenizer.convert_tokens_to_ids(tokenizer.stop_tokens) |
|
|
| |
| tokenizer.media_tokens = MEDIA_TOKENS |
| tokenizer.media_token_ids = {} |
| for name, token in MEDIA_TOKENS.items(): |
| tokenizer.add_tokens([token], special_tokens=True) |
| tokenizer.media_token_ids[name] = tokenizer.convert_tokens_to_ids(token) |
|
|
| return tokenizer |
|
|
|
|
| def get_model_config(config): |
| default_keys = ["llm_cfg", "vision_tower_cfg", "mm_projector_cfg"] |
|
|
| if hasattr(config, "_name_or_path") and len(config._name_or_path) >= 2: |
| root_path = config._name_or_path |
| else: |
| root_path = config.resume_path |
|
|
| |
| if root_path is not None and not osp.exists(root_path): |
| try: |
| valid_hf_repo = repo_exists(root_path) |
| except HFValidationError as e: |
| valid_hf_repo = False |
| if valid_hf_repo: |
| root_path = snapshot_download(root_path) |
|
|
| return_list = [] |
| for key in default_keys: |
| cfg = getattr(config, key, None) |
| if isinstance(cfg, dict): |
| try: |
| return_list.append(os.path.join(root_path, key[:-4])) |
| except: |
| raise ValueError(f"Cannot find resume path in config for {key}!") |
| elif isinstance(cfg, PretrainedConfig): |
| return_list.append(os.path.join(root_path, key[:-4])) |
| elif isinstance(cfg, str): |
| return_list.append(cfg) |
|
|
| return return_list |
|
|
|
|
| def get_model_config_fp8(config): |
| default_keys = ["llm_cfg", "vision_tower_cfg", "mm_projector_cfg"] |
|
|
| if hasattr(config, "_name_or_path") and len(config._name_or_path) >= 2: |
| root_path = config._name_or_path |
| else: |
| root_path = config.resume_path |
|
|
| |
| if root_path is not None and not osp.exists(root_path): |
| try: |
| valid_hf_repo = repo_exists(root_path) |
| except HFValidationError as e: |
| valid_hf_repo = False |
| if valid_hf_repo: |
| root_path = snapshot_download(root_path) |
|
|
| return_list = [] |
| for key in default_keys: |
| cfg = getattr(config, key, None) |
| if isinstance(cfg, dict): |
| try: |
| return_list.append(os.path.join(root_path, key[:-4])) |
| except: |
| raise ValueError(f"Cannot find resume path in config for {key}!") |
| elif isinstance(cfg, PretrainedConfig): |
| return_list.append(os.path.join(root_path, key[:-4])) |
| elif isinstance(cfg, str): |
| return_list.append(cfg) |
|
|
| |
| key = "fp8_llm_cfg" |
| directory_path = os.path.join(root_path, key[:-4]) |
| assert os.path.isdir(directory_path) and os.listdir( |
| directory_path |
| ), "You need to first convert the model weights to FP8 explicitly." |
| return_list.append(directory_path) |
|
|
| return return_list |
|
|
|
|
| def get_model_config_fp8(config): |
| default_keys = ["llm_cfg", "vision_tower_cfg", "mm_projector_cfg"] |
|
|
| if hasattr(config, "_name_or_path") and len(config._name_or_path) >= 2: |
| root_path = config._name_or_path |
| else: |
| root_path = config.resume_path |
|
|
| |
| if root_path is not None and not osp.exists(root_path): |
| try: |
| valid_hf_repo = repo_exists(root_path) |
| except HFValidationError as e: |
| valid_hf_repo = False |
| if valid_hf_repo: |
| root_path = snapshot_download(root_path) |
|
|
| return_list = [] |
| for key in default_keys: |
| cfg = getattr(config, key, None) |
| if isinstance(cfg, dict): |
| try: |
| return_list.append(os.path.join(root_path, key[:-4])) |
| except: |
| raise ValueError(f"Cannot find resume path in config for {key}!") |
| elif isinstance(cfg, PretrainedConfig): |
| return_list.append(os.path.join(root_path, key[:-4])) |
| elif isinstance(cfg, str): |
| return_list.append(cfg) |
|
|
| |
| key = "fp8_llm_cfg" |
| directory_path = os.path.join(root_path, key[:-4]) |
| assert os.path.isdir(directory_path) and os.listdir( |
| directory_path |
| ), "You need to first convert the model weights to FP8 explicitly." |
| return_list.append(directory_path) |
|
|
| return return_list |
|
|
|
|
| def is_mm_model(model_path): |
| """ |
| Check if the model at the given path is a visual language model. |
| |
| Args: |
| model_path (str): The path to the model. |
| |
| Returns: |
| bool: True if the model is an MM model, False otherwise. |
| """ |
| config = AutoConfig.from_pretrained(model_path) |
| architectures = config.architectures |
| for architecture in architectures: |
| if "llava" in architecture.lower(): |
| return True |
| return False |
|
|
|
|
| def auto_upgrade(config): |
| cfg = AutoConfig.from_pretrained(config) |
| if "llava" in config and "llava" not in cfg.model_type: |
| assert cfg.model_type == "llama" |
| print("You are using newer LLaVA code base, while the checkpoint of v0 is from older code base.") |
| print("You must upgrade the checkpoint to the new code base (this can be done automatically).") |
| confirm = input("Please confirm that you want to upgrade the checkpoint. [Y/N]") |
| if confirm.lower() in ["y", "yes"]: |
| print("Upgrading checkpoint...") |
| assert len(cfg.architectures) == 1 |
| setattr(cfg.__class__, "model_type", "llava") |
| cfg.architectures[0] = "LlavaLlamaForCausalLM" |
| cfg.save_pretrained(config) |
| print("Checkpoint upgraded.") |
| else: |
| print("Checkpoint upgrade aborted.") |
| exit(1) |
|
|