""" graph_builder.py — Build the full dependency graph for a Python repository. Edge types (v2): 1. Direct call edges — f() calls g() → f→g 2. Inheritance override edges — Child.method → Parent.method (child only) 3. Shared-import consumer edges— files co-importing same module (capped ≤10) 4. Decorator edges — @decorator applied to a function → fn→decorator 5. Annotated return-type edges — def f() -> MyClass: ... → f→MyClass.__init__ 6. Same-directory sibling edges— one representative per file, light connectivity Performance: - Import maps built ONCE in pre-pass. - _resolve_owner_type results memoized (manual dict cache). """ import ast import logging import os from typing import Dict, List, Optional, Set, Tuple from .scanner import find_python_files from .parser import extract_all_symbols from .resolver import build_import_map from .symbols import ( extract_attribute_ownerships, extract_local_var_types, extract_param_types, _iter_statements, ) from ._warn_once import warn_syntax_error_once, check_and_warn_encoding logger = logging.getLogger(__name__) # Fan-out caps — keep shared-import and sibling edges from becoming mega-hubs _SHARED_IMPORT_MAX_CONSUMERS = 10 # skip if >10 files share the same import _SAME_DIR_MAX_FILES = 20 # skip same-dir bonus if directory is huge _DECORATOR_EDGE_MAX = 6 # max decorator edges per function (guards chains) def build_repository_graph(repo_path: str) -> Dict[str, List[str]]: """ Build the complete call graph for a repository. Returns: dict mapping function_id -> [list of called function_ids] """ repo_path = os.path.abspath(repo_path) functions = extract_all_symbols(repo_path) function_ids = set(functions) # ── pre-pass: per-file ASTs, import maps, class registry ───────────── file_trees: Dict[str, ast.Module] = {} import_maps: Dict[str, Dict[str, str]] = {} class_registry: Dict[str, List[str]] = {} # class_name -> [rel_file, ...] classes_by_file: Dict[str, List[str]] = {} # rel_file -> [class_name, ...] inheritance: Dict[str, List[Tuple]] = {} # "rel_file:ClassName" -> bases factory_returns: Dict[str, Tuple] = {} # "rel_file:func" -> (q, type) # Module-level var types: rel_file -> {var_name: (qualifier, bare_name)} # e.g. `app = Flask(__name__)` at module scope -> {"app": (None, "Flask")} module_var_types: Dict[str, Dict[str, Tuple]] = {} for filename in find_python_files(repo_path): with open(filename, "rb") as f: raw = f.read() check_and_warn_encoding(logger, filename, raw) source = raw.decode("utf-8", errors="ignore") try: tree = ast.parse(source) except SyntaxError as e: warn_syntax_error_once(logger, filename, e) continue relative_file = "./" + os.path.relpath(filename, repo_path) file_trees[relative_file] = tree import_maps[relative_file] = build_import_map(filename, repo_path) mvt: Dict[str, Tuple] = {} for node in tree.body: if isinstance(node, ast.ClassDef): class_registry.setdefault(node.name, []).append(relative_file) classes_by_file.setdefault(relative_file, []).append(node.name) bases = [] for b in node.bases: if isinstance(b, ast.Name): bases.append((None, b.id)) elif isinstance(b, ast.Attribute) and isinstance(b.value, ast.Name): bases.append((b.value.id, b.attr)) elif isinstance(b, ast.Attribute): bases.append((None, b.attr)) inheritance[f"{relative_file}:{node.name}"] = bases elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): ref = _find_return_type(node) if ref: factory_returns[f"{relative_file}:{node.name}"] = ref ann_ref = _find_annotated_return(node) if ann_ref and not ref: factory_returns[f"{relative_file}:{node.name}"] = ann_ref elif isinstance(node, ast.Assign): # Track module-level: `app = Flask(__name__)` if isinstance(node.value, ast.Call): func = node.value.func if isinstance(func, ast.Name): type_ref = (None, func.id) elif isinstance(func, ast.Attribute) and isinstance(func.value, ast.Name): type_ref = (func.value.id, func.attr) else: type_ref = None if type_ref: for tgt in node.targets: if isinstance(tgt, ast.Name): mvt[tgt.id] = type_ref elif isinstance(node, ast.AnnAssign): # Track: `db: SQLAlchemy = SQLAlchemy(app)` if node.value and isinstance(node.value, ast.Call): func = node.value.func if isinstance(func, ast.Name): type_ref = (None, func.id) elif isinstance(func, ast.Attribute) and isinstance(func.value, ast.Name): type_ref = (func.value.id, func.attr) else: type_ref = None if type_ref and isinstance(node.target, ast.Name): mvt[node.target.id] = type_ref if mvt: module_var_types[relative_file] = mvt # ── Resolution cache ────────────────────────────────────────────────── # _resolve_owner_type is called O(symbols * calls_per_function) times. # The same (qualifier, bare_name, rel_file) triple recurs constantly on # large repos. Cache the result to avoid redundant work. _resolve_cache: Dict[Tuple, Optional[str]] = {} def _cached_resolve_owner_type(qualifier, bare_name, relative_file, import_map, _seen=None): key = (qualifier, bare_name, relative_file) if key in _resolve_cache: return _resolve_cache[key] result = _resolve_owner_type( qualifier, bare_name, relative_file, import_map, class_registry, factory_returns, import_maps, repo_path, _seen=_seen, classes_by_file=classes_by_file, ) _resolve_cache[key] = result return result # ── Attribute owners ────────────────────────────────────────────────── attribute_owners: Dict[str, str] = {} # Also register module-level vars as attribute owners so that # `app.run()` in a function body resolves to Flask.run. for rel_file, mvt in module_var_types.items(): import_map = import_maps[rel_file] for var_name, type_ref in mvt.items(): qualifier, bare_name = type_ref resolved = _resolve_owner_type( qualifier, bare_name, rel_file, import_map, class_registry, factory_returns, import_maps, repo_path, classes_by_file=classes_by_file, ) if resolved: # Key format matches attribute_owners: "ClassName.attr" # but here var_name is the module-level variable. attribute_owners[f"{rel_file}:{var_name}"] = resolved for relative_file, tree in file_trees.items(): import_map = import_maps[relative_file] raw_own = extract_attribute_ownerships(tree) for key, type_ref in raw_own.items(): if type_ref is None: continue qualifier, bare_name = type_ref resolved = _cached_resolve_owner_type(qualifier, bare_name, relative_file, import_map) if resolved: attribute_owners[f"{relative_file}:{key}"] = resolved # ── Build the call graph ────────────────────────────────────────────── graph: Dict[str, List[str]] = {} for relative_file, tree in file_trees.items(): import_map = import_maps[relative_file] local_name_to_id = { fid.split(":", 1)[1]: fid for fid in functions if fid.startswith(relative_file + ":") } function_nodes = _collect_function_nodes(tree) for fn_node, is_method, class_name in function_nodes: function_name = f"{class_name}.{fn_node.name}" if class_name else fn_node.name function_id = f"{relative_file}:{function_name}" graph.setdefault(function_id, []) param_types = extract_param_types(fn_node) local_var_types = {**param_types, **extract_local_var_types(fn_node, param_types)} for child in ast.walk(fn_node): if not isinstance(child, ast.Call): continue dep = _resolve_call( child, is_method, class_name, relative_file, local_name_to_id, import_map, functions, function_ids, repo_path, attribute_owners, inheritance, class_registry, factory_returns, import_maps, local_var_types, classes_by_file, _cached_resolve_owner_type, ) if dep and dep != function_id and dep not in graph[function_id]: graph[function_id].append(dep) # ── Phase 1A: Inheritance override edges ───────────────────────────── # When ChildClass overrides ParentClass.method, add child → parent edge. # Only child → parent direction: "if I changed the child, show me the # parent contract I might be violating." The reverse (parent → all 400 # children) would create mega-hubs that destroy ranking. for child_key, bases in inheritance.items(): child_file, child_class = child_key.split(":", 1) for qualifier, base_name in bases: base_owner = _resolve_owner_type( qualifier, base_name, child_file, import_maps.get(child_file, {}), class_registry, factory_returns, import_maps, repo_path, classes_by_file=classes_by_file, ) if not base_owner: continue # Find all methods in the child class and look for same-named parent methods child_prefix = f"{child_key}." for fid in function_ids: if not fid.startswith(child_prefix): continue method_name = fid[len(child_prefix):] parent_method = f"{base_owner}.{method_name}" if parent_method in function_ids and parent_method != fid: # Child → parent only (avoids mega-hub on parent side) if parent_method not in graph.get(fid, []): graph.setdefault(fid, []).append(parent_method) # ── Phase 1B: Decorator edges ───────────────────────────────────────── # A function decorated with @my_decorator has an implicit dependency on # my_decorator. This is one of the strongest co-change signals: # if the decorator changes, all its callsites likely change too. for relative_file, tree in file_trees.items(): import_map = import_maps[relative_file] for fn_node, _is_method, _class_name in _collect_function_nodes(tree): added = 0 for deco in fn_node.decorator_list: if added >= _DECORATOR_EDGE_MAX: break # @plain_name or @module.name or @name(args) deco_func = deco.func if isinstance(deco, ast.Call) else deco dep = None if isinstance(deco_func, ast.Name): dep = _lookup( deco_func.id, {fid.split(":", 1)[1]: fid for fid in functions if fid.startswith(relative_file + ":")}, import_map, functions, repo_path, ) elif isinstance(deco_func, ast.Attribute) and isinstance(deco_func.value, ast.Name): if deco_func.value.id in import_map: src = import_map[deco_func.value.id] rel_src = "./" + os.path.relpath(src, repo_path) dep = f"{rel_src}:{deco_func.attr}" if dep not in function_ids: dep = None if dep: fn_name_local = ( f"{_class_name}.{fn_node.name}" if _class_name else fn_node.name ) fid = f"{relative_file}:{fn_name_local}" if fid in function_ids and dep != fid and dep not in graph.get(fid, []): graph.setdefault(fid, []).append(dep) added += 1 # ── Build file_groups for use by shared-import edges ──────────────────── file_groups: Dict[str, List[str]] = {} for fid in function_ids: ffile = fid.split(":")[0] file_groups.setdefault(ffile, []).append(fid) # ── Phase 1C: Shared-import consumer edges ─────────────────────────── # If file_a and file_b both import from the same internal module, # functions in file_a and file_b are likely to co-change when that # module changes. Create edges between functions across those files. import_consumers: Dict[str, List[str]] = {} for rel_file, imap in import_maps.items(): for _local_name, abs_path in imap.items(): # Only track internal (in-repo) imports rel_imported = "./" + os.path.relpath(abs_path, repo_path) if not rel_imported.startswith("./"): continue import_consumers.setdefault(rel_imported, []).append(rel_file) # Deduplicate consumer file lists, then connect representatives for _imported_mod, consumer_files in import_consumers.items(): consumer_files = list(dict.fromkeys(consumer_files)) # deduplicate, preserve order if len(consumer_files) < 2 or len(consumer_files) > _SHARED_IMPORT_MAX_CONSUMERS: continue # Pick a representative symbol from each consumer file representatives = [] for cfile in consumer_files: rep_syms = file_groups.get(cfile, []) if rep_syms: representatives.append(rep_syms[0]) # Connect representatives pairwise for i, a in enumerate(representatives): for b in representatives[i + 1:]: if b not in graph.get(a, []): graph.setdefault(a, []).append(b) if a not in graph.get(b, []): graph.setdefault(b, []).append(a) # ── Phase 1D: Same-directory sibling edges ──────────────────────────── # Files in the same package directory tend to co-change (tests ↔ impl, # models ↔ serializers, etc.). Connect one representative per file to # one representative from every other file in the same directory. # Cap: skip directories with >_SAME_DIR_MAX_FILES Python files to avoid # linking unrelated utility grab-bags. dir_files: Dict[str, List[str]] = {} for rel_file in file_groups: dir_part = os.path.dirname(rel_file) dir_files.setdefault(dir_part, []).append(rel_file) for _dir, dir_file_list in dir_files.items(): if len(dir_file_list) < 2 or len(dir_file_list) > _SAME_DIR_MAX_FILES: continue reps = [] for df in dir_file_list: syms = file_groups.get(df, []) if syms: reps.append(syms[0]) # one rep per file for i, a in enumerate(reps): for b in reps[i + 1:]: if b not in graph.get(a, []): graph.setdefault(a, []).append(b) if a not in graph.get(b, []): graph.setdefault(b, []).append(a) # ── Phase 1E: Sliding-window within-file edges ──────────────────────── # Functions defined near each other in the same file are empirically the # strongest co-change signal after direct calls. A sliding window of # WINDOW_SIZE links each function to its closest neighbours in definition # order. Cap: skip files with >FILE_WINDOW_MAX_SYMS symbols (e.g. a # 600-function god-file would create O(n*w) noise edges). WINDOW_SIZE = 3 # each function linked to ±3 neighbours FILE_WINDOW_MAX_SYMS = 60 # skip window edges in very large files for rel_file, syms_in_file in file_groups.items(): if len(syms_in_file) < 2 or len(syms_in_file) > FILE_WINDOW_MAX_SYMS: continue for i, a in enumerate(syms_in_file): for j in range(i + 1, min(i + 1 + WINDOW_SIZE, len(syms_in_file))): b = syms_in_file[j] if b not in graph.get(a, []): graph.setdefault(a, []).append(b) if a not in graph.get(b, []): graph.setdefault(b, []).append(a) # ── Phase 1F: Light parent→child inheritance edges ──────────────────── # Child→parent already exists (Phase 1A). For parents with FEW children # (≤PARENT_CHILD_MAX_CHILDREN), also add parent→child so that a change # to the parent method surfaces its direct overriders. We skip parents # with many children to avoid creating mega-hubs (e.g. BaseModel in # pydantic has 400+ subclasses — those would destroy ranking). PARENT_CHILD_MAX_CHILDREN = 8 # Build a map: parent_method_id -> [child_method_ids] parent_to_children: Dict[str, List[str]] = {} for child_key, bases in inheritance.items(): child_file, child_class = child_key.split(":", 1) for qualifier, base_name in bases: base_owner = _resolve_owner_type( qualifier, base_name, child_file, import_maps.get(child_file, {}), class_registry, factory_returns, import_maps, repo_path, classes_by_file=classes_by_file, ) if not base_owner: continue child_prefix = f"{child_key}." for fid in function_ids: if not fid.startswith(child_prefix): continue method_name = fid[len(child_prefix):] parent_method = f"{base_owner}.{method_name}" if parent_method in function_ids and parent_method != fid: parent_to_children.setdefault(parent_method, []).append(fid) for parent_method, children in parent_to_children.items(): if len(children) > PARENT_CHILD_MAX_CHILDREN: continue # too many — would create a mega-hub for child_fid in children: if child_fid not in graph.get(parent_method, []): graph.setdefault(parent_method, []).append(child_fid) return graph # ── Internal helpers ────────────────────────────────────────────────────── def _collect_function_nodes(tree): """ Return list of (function_node, is_method, class_name) for EVERY function in the file, including nested functions and closures. Mirrors what parser.py's _FunctionCollector does so that the graph covers every symbol the parser emits (previously missed ~30-40% of nodes). """ result = [] _collect_recursive(tree.body, class_stack=[], result=result) return result def _collect_recursive(stmts, class_stack, result): """Recursively collect function nodes from a list of statements.""" for node in stmts: if isinstance(node, ast.ClassDef): class_stack.append(node.name) _collect_recursive(node.body, class_stack, result) class_stack.pop() elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): if class_stack: class_name = ".".join(class_stack) is_method = True else: class_name = None is_method = False result.append((node, is_method, class_name)) # Also recurse into the function body to catch closures/nested funcs _collect_recursive(node.body, class_stack, result) def _find_return_type(node): found = None for stmt in _iter_statements(node.body): if isinstance(stmt, ast.Return) and stmt.value is not None: if not isinstance(stmt.value, ast.Call): return None func = stmt.value.func if isinstance(func, ast.Name): ref = (None, func.id) elif isinstance(func, ast.Attribute) and isinstance(func.value, ast.Name): ref = (func.value.id, func.attr) elif isinstance(func, ast.Attribute): ref = (None, func.attr) else: return None if found is None: found = ref elif found != ref: return None return found def _find_annotated_return(node): """ Extract (qualifier, bare_name) from a PEP-3107 return annotation. def f() -> MyClass: ... -> (None, "MyClass") def f() -> module.MyClass: ... -> ("module", "MyClass") Skips primitive annotations (str, int, bool, None, etc.) and generics like List[X] where the outer type is not a class we own. """ PRIMITIVES = frozenset({ "str", "int", "float", "bool", "bytes", "None", "list", "dict", "set", "tuple", "Any", "Optional", "List", "Dict", "Set", "Tuple", "Iterator", "Generator", "Iterable", "Sequence", "Mapping", "Type", "Union", }) ann = node.returns if ann is None: return None if isinstance(ann, ast.Constant): return None if isinstance(ann, ast.Name): if ann.id in PRIMITIVES: return None return (None, ann.id) if isinstance(ann, ast.Attribute) and isinstance(ann.value, ast.Name): if ann.attr in PRIMITIVES: return None return (ann.value.id, ann.attr) # Subscript like Optional[Router] — unwrap one level if isinstance(ann, ast.Subscript): return _find_annotated_return( type("_Stub", (), {"returns": ann.slice})() # type: ignore[arg-type] ) return None def _resolve_owner_type( qualifier, bare_name, relative_file, import_map, class_registry, factory_returns, import_maps, repo_path, _seen=None, classes_by_file=None, ): if _seen is None: _seen = set() cache_key = (qualifier, bare_name, relative_file) if cache_key in _seen: return None _seen.add(cache_key) if qualifier: if qualifier not in import_map: return None target_file = "./" + os.path.relpath(import_map[qualifier], repo_path) if bare_name in class_registry: if target_file in class_registry[bare_name]: return f"{target_file}:{bare_name}" if target_file.endswith("__init__.py"): target_dir = target_file[:-12] for cand_file in class_registry[bare_name]: if cand_file.startswith(target_dir + "/"): return f"{cand_file}:{bare_name}" factory_key = f"{target_file}:{bare_name}" if factory_key in factory_returns: return _resolve_owner_type( *factory_returns[factory_key], target_file, import_maps.get(target_file, {}), class_registry, factory_returns, import_maps, repo_path, _seen, classes_by_file, ) return None # bare name if bare_name in class_registry and relative_file in class_registry[bare_name]: return f"{relative_file}:{bare_name}" if bare_name in import_map: target_file = "./" + os.path.relpath(import_map[bare_name], repo_path) if bare_name in class_registry: if target_file in class_registry[bare_name]: return f"{target_file}:{bare_name}" if target_file.endswith("__init__.py"): target_dir = target_file[:-12] for cand_file in class_registry[bare_name]: if cand_file.startswith(target_dir + "/"): return f"{cand_file}:{bare_name}" elif classes_by_file: file_classes = classes_by_file.get(target_file, []) if len(file_classes) == 1: return f"{target_file}:{file_classes[0]}" factory_key = f"{target_file}:{bare_name}" if factory_key in factory_returns: return _resolve_owner_type( *factory_returns[factory_key], target_file, import_maps.get(target_file, {}), class_registry, factory_returns, import_maps, repo_path, _seen, classes_by_file, ) return None factory_key = f"{relative_file}:{bare_name}" if factory_key in factory_returns: return _resolve_owner_type( *factory_returns[factory_key], relative_file, import_map, class_registry, factory_returns, import_maps, repo_path, _seen, classes_by_file=classes_by_file, ) return None def _resolve_owner_of_expr( node, class_name, relative_file, is_method, attribute_owners, local_var_types=None, import_map=None, class_registry=None, factory_returns=None, import_maps=None, repo_path=None, classes_by_file=None, _cached_resolve=None, ): if isinstance(node, ast.Name) and node.id == "self" and is_method and class_name: return f"{relative_file}:{class_name}" if ( isinstance(node, ast.Name) and local_var_types and node.id in local_var_types and import_map is not None ): qualifier, bare_name = local_var_types[node.id] if _cached_resolve: return _cached_resolve(qualifier, bare_name, relative_file, import_map) return _resolve_owner_type( qualifier, bare_name, relative_file, import_map, class_registry or {}, factory_returns or {}, import_maps or {}, repo_path or "", classes_by_file=classes_by_file, ) # Module-level variable fallback: `app = Flask()` at module scope. # attribute_owners stores these as "{rel_file}:{var_name}". if isinstance(node, ast.Name) and attribute_owners: module_key = f"{relative_file}:{node.id}" if module_key in attribute_owners: return attribute_owners[module_key] if isinstance(node, ast.Attribute): base_owner = _resolve_owner_of_expr( node.value, class_name, relative_file, is_method, attribute_owners, local_var_types, import_map, class_registry, factory_returns, import_maps, repo_path, classes_by_file, _cached_resolve, ) if base_owner is None: return None return attribute_owners.get(f"{base_owner}.{node.attr}") return None def _resolve_via_inheritance( owner_type, method_name, inheritance, class_registry, import_maps, function_ids, repo_path, _seen=None, classes_by_file=None, ): if _seen is None: _seen = set() if owner_type in _seen: return None _seen.add(owner_type) rel_file, class_name = owner_type.split(":", 1) import_map = import_maps.get(rel_file, {}) for qualifier, base_name in inheritance.get(owner_type, []): base_owner = _resolve_owner_type( qualifier, base_name, rel_file, import_map, class_registry, {}, import_maps, repo_path, classes_by_file=classes_by_file, ) if not base_owner: continue candidate = f"{base_owner}.{method_name}" if candidate in function_ids: return candidate deeper = _resolve_via_inheritance( base_owner, method_name, inheritance, class_registry, import_maps, function_ids, repo_path, _seen, classes_by_file, ) if deeper: return deeper return None def _resolve_call( call_node, is_method, class_name, relative_file, local_name_to_id, import_map, all_functions, function_ids, repo_path, attribute_owners, inheritance, class_registry, factory_returns, import_maps, local_var_types=None, classes_by_file=None, _cached_resolve=None, ): func = call_node.func if isinstance(func, ast.Name): return _lookup(func.id, local_name_to_id, import_map, all_functions, repo_path) if isinstance(func, ast.Attribute): method_name = func.attr owner_type = _resolve_owner_of_expr( func.value, class_name, relative_file, is_method, attribute_owners, local_var_types, import_map, class_registry, factory_returns, import_maps, repo_path, classes_by_file, _cached_resolve, ) if owner_type: candidate = f"{owner_type}.{method_name}" if candidate in function_ids: return candidate resolved = _resolve_via_inheritance( owner_type, method_name, inheritance, class_registry, import_maps, function_ids, repo_path, classes_by_file=classes_by_file, ) if resolved: return resolved return None if isinstance(func.value, ast.Name) and func.value.id in import_map: source_file = import_map[func.value.id] relative_source = "./" + os.path.relpath(source_file, repo_path) candidate = f"{relative_source}:{method_name}" if candidate in all_functions: return candidate return None def _lookup(called_name, local_name_to_id, import_map, all_functions, repo_path): if called_name in local_name_to_id: return local_name_to_id[called_name] if called_name in import_map: source_file = import_map[called_name] relative_source = "./" + os.path.relpath(source_file, repo_path) candidate = f"{relative_source}:{called_name}" if candidate in all_functions: return candidate return None