Diffcontext / diffcontext /graph_builder.py
trakshan-mishra
Deploy FastAPI & MCP server over SSE
036a2db
Raw
History Blame Contribute Delete
31.7 kB
"""
graph_builder.py — Build the full dependency graph for a Python repository.
Edge types (v2):
1. Direct call edges — f() calls g() → f→g
2. Inheritance override edges — Child.method → Parent.method (child only)
3. Shared-import consumer edges— files co-importing same module (capped ≤10)
4. Decorator edges — @decorator applied to a function → fn→decorator
5. Annotated return-type edges — def f() -> MyClass: ... → f→MyClass.__init__
6. Same-directory sibling edges— one representative per file, light connectivity
Performance:
- Import maps built ONCE in pre-pass.
- _resolve_owner_type results memoized (manual dict cache).
"""
import ast
import logging
import os
from typing import Dict, List, Optional, Set, Tuple
from .scanner import find_python_files
from .parser import extract_all_symbols
from .resolver import build_import_map
from .symbols import (
extract_attribute_ownerships,
extract_local_var_types,
extract_param_types,
_iter_statements,
)
from ._warn_once import warn_syntax_error_once, check_and_warn_encoding
logger = logging.getLogger(__name__)
# Fan-out caps — keep shared-import and sibling edges from becoming mega-hubs
_SHARED_IMPORT_MAX_CONSUMERS = 10 # skip if >10 files share the same import
_SAME_DIR_MAX_FILES = 20 # skip same-dir bonus if directory is huge
_DECORATOR_EDGE_MAX = 6 # max decorator edges per function (guards chains)
def build_repository_graph(repo_path: str) -> Dict[str, List[str]]:
"""
Build the complete call graph for a repository.
Returns:
dict mapping function_id -> [list of called function_ids]
"""
repo_path = os.path.abspath(repo_path)
functions = extract_all_symbols(repo_path)
function_ids = set(functions)
# ── pre-pass: per-file ASTs, import maps, class registry ─────────────
file_trees: Dict[str, ast.Module] = {}
import_maps: Dict[str, Dict[str, str]] = {}
class_registry: Dict[str, List[str]] = {} # class_name -> [rel_file, ...]
classes_by_file: Dict[str, List[str]] = {} # rel_file -> [class_name, ...]
inheritance: Dict[str, List[Tuple]] = {} # "rel_file:ClassName" -> bases
factory_returns: Dict[str, Tuple] = {} # "rel_file:func" -> (q, type)
# Module-level var types: rel_file -> {var_name: (qualifier, bare_name)}
# e.g. `app = Flask(__name__)` at module scope -> {"app": (None, "Flask")}
module_var_types: Dict[str, Dict[str, Tuple]] = {}
for filename in find_python_files(repo_path):
with open(filename, "rb") as f:
raw = f.read()
check_and_warn_encoding(logger, filename, raw)
source = raw.decode("utf-8", errors="ignore")
try:
tree = ast.parse(source)
except SyntaxError as e:
warn_syntax_error_once(logger, filename, e)
continue
relative_file = "./" + os.path.relpath(filename, repo_path)
file_trees[relative_file] = tree
import_maps[relative_file] = build_import_map(filename, repo_path)
mvt: Dict[str, Tuple] = {}
for node in tree.body:
if isinstance(node, ast.ClassDef):
class_registry.setdefault(node.name, []).append(relative_file)
classes_by_file.setdefault(relative_file, []).append(node.name)
bases = []
for b in node.bases:
if isinstance(b, ast.Name):
bases.append((None, b.id))
elif isinstance(b, ast.Attribute) and isinstance(b.value, ast.Name):
bases.append((b.value.id, b.attr))
elif isinstance(b, ast.Attribute):
bases.append((None, b.attr))
inheritance[f"{relative_file}:{node.name}"] = bases
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
ref = _find_return_type(node)
if ref:
factory_returns[f"{relative_file}:{node.name}"] = ref
ann_ref = _find_annotated_return(node)
if ann_ref and not ref:
factory_returns[f"{relative_file}:{node.name}"] = ann_ref
elif isinstance(node, ast.Assign):
# Track module-level: `app = Flask(__name__)`
if isinstance(node.value, ast.Call):
func = node.value.func
if isinstance(func, ast.Name):
type_ref = (None, func.id)
elif isinstance(func, ast.Attribute) and isinstance(func.value, ast.Name):
type_ref = (func.value.id, func.attr)
else:
type_ref = None
if type_ref:
for tgt in node.targets:
if isinstance(tgt, ast.Name):
mvt[tgt.id] = type_ref
elif isinstance(node, ast.AnnAssign):
# Track: `db: SQLAlchemy = SQLAlchemy(app)`
if node.value and isinstance(node.value, ast.Call):
func = node.value.func
if isinstance(func, ast.Name):
type_ref = (None, func.id)
elif isinstance(func, ast.Attribute) and isinstance(func.value, ast.Name):
type_ref = (func.value.id, func.attr)
else:
type_ref = None
if type_ref and isinstance(node.target, ast.Name):
mvt[node.target.id] = type_ref
if mvt:
module_var_types[relative_file] = mvt
# ── Resolution cache ──────────────────────────────────────────────────
# _resolve_owner_type is called O(symbols * calls_per_function) times.
# The same (qualifier, bare_name, rel_file) triple recurs constantly on
# large repos. Cache the result to avoid redundant work.
_resolve_cache: Dict[Tuple, Optional[str]] = {}
def _cached_resolve_owner_type(qualifier, bare_name, relative_file, import_map, _seen=None):
key = (qualifier, bare_name, relative_file)
if key in _resolve_cache:
return _resolve_cache[key]
result = _resolve_owner_type(
qualifier, bare_name, relative_file, import_map,
class_registry, factory_returns, import_maps, repo_path,
_seen=_seen,
classes_by_file=classes_by_file,
)
_resolve_cache[key] = result
return result
# ── Attribute owners ──────────────────────────────────────────────────
attribute_owners: Dict[str, str] = {}
# Also register module-level vars as attribute owners so that
# `app.run()` in a function body resolves to Flask.run.
for rel_file, mvt in module_var_types.items():
import_map = import_maps[rel_file]
for var_name, type_ref in mvt.items():
qualifier, bare_name = type_ref
resolved = _resolve_owner_type(
qualifier, bare_name, rel_file, import_map,
class_registry, factory_returns, import_maps, repo_path,
classes_by_file=classes_by_file,
)
if resolved:
# Key format matches attribute_owners: "ClassName.attr"
# but here var_name is the module-level variable.
attribute_owners[f"{rel_file}:{var_name}"] = resolved
for relative_file, tree in file_trees.items():
import_map = import_maps[relative_file]
raw_own = extract_attribute_ownerships(tree)
for key, type_ref in raw_own.items():
if type_ref is None:
continue
qualifier, bare_name = type_ref
resolved = _cached_resolve_owner_type(qualifier, bare_name, relative_file, import_map)
if resolved:
attribute_owners[f"{relative_file}:{key}"] = resolved
# ── Build the call graph ──────────────────────────────────────────────
graph: Dict[str, List[str]] = {}
for relative_file, tree in file_trees.items():
import_map = import_maps[relative_file]
local_name_to_id = {
fid.split(":", 1)[1]: fid
for fid in functions
if fid.startswith(relative_file + ":")
}
function_nodes = _collect_function_nodes(tree)
for fn_node, is_method, class_name in function_nodes:
function_name = f"{class_name}.{fn_node.name}" if class_name else fn_node.name
function_id = f"{relative_file}:{function_name}"
graph.setdefault(function_id, [])
param_types = extract_param_types(fn_node)
local_var_types = {**param_types, **extract_local_var_types(fn_node, param_types)}
for child in ast.walk(fn_node):
if not isinstance(child, ast.Call):
continue
dep = _resolve_call(
child,
is_method,
class_name,
relative_file,
local_name_to_id,
import_map,
functions,
function_ids,
repo_path,
attribute_owners,
inheritance,
class_registry,
factory_returns,
import_maps,
local_var_types,
classes_by_file,
_cached_resolve_owner_type,
)
if dep and dep != function_id and dep not in graph[function_id]:
graph[function_id].append(dep)
# ── Phase 1A: Inheritance override edges ─────────────────────────────
# When ChildClass overrides ParentClass.method, add child → parent edge.
# Only child → parent direction: "if I changed the child, show me the
# parent contract I might be violating." The reverse (parent → all 400
# children) would create mega-hubs that destroy ranking.
for child_key, bases in inheritance.items():
child_file, child_class = child_key.split(":", 1)
for qualifier, base_name in bases:
base_owner = _resolve_owner_type(
qualifier, base_name, child_file,
import_maps.get(child_file, {}),
class_registry, factory_returns, import_maps, repo_path,
classes_by_file=classes_by_file,
)
if not base_owner:
continue
# Find all methods in the child class and look for same-named parent methods
child_prefix = f"{child_key}."
for fid in function_ids:
if not fid.startswith(child_prefix):
continue
method_name = fid[len(child_prefix):]
parent_method = f"{base_owner}.{method_name}"
if parent_method in function_ids and parent_method != fid:
# Child → parent only (avoids mega-hub on parent side)
if parent_method not in graph.get(fid, []):
graph.setdefault(fid, []).append(parent_method)
# ── Phase 1B: Decorator edges ─────────────────────────────────────────
# A function decorated with @my_decorator has an implicit dependency on
# my_decorator. This is one of the strongest co-change signals:
# if the decorator changes, all its callsites likely change too.
for relative_file, tree in file_trees.items():
import_map = import_maps[relative_file]
for fn_node, _is_method, _class_name in _collect_function_nodes(tree):
added = 0
for deco in fn_node.decorator_list:
if added >= _DECORATOR_EDGE_MAX:
break
# @plain_name or @module.name or @name(args)
deco_func = deco.func if isinstance(deco, ast.Call) else deco
dep = None
if isinstance(deco_func, ast.Name):
dep = _lookup(
deco_func.id,
{fid.split(":", 1)[1]: fid
for fid in functions
if fid.startswith(relative_file + ":")},
import_map, functions, repo_path,
)
elif isinstance(deco_func, ast.Attribute) and isinstance(deco_func.value, ast.Name):
if deco_func.value.id in import_map:
src = import_map[deco_func.value.id]
rel_src = "./" + os.path.relpath(src, repo_path)
dep = f"{rel_src}:{deco_func.attr}"
if dep not in function_ids:
dep = None
if dep:
fn_name_local = (
f"{_class_name}.{fn_node.name}"
if _class_name else fn_node.name
)
fid = f"{relative_file}:{fn_name_local}"
if fid in function_ids and dep != fid and dep not in graph.get(fid, []):
graph.setdefault(fid, []).append(dep)
added += 1
# ── Build file_groups for use by shared-import edges ────────────────────
file_groups: Dict[str, List[str]] = {}
for fid in function_ids:
ffile = fid.split(":")[0]
file_groups.setdefault(ffile, []).append(fid)
# ── Phase 1C: Shared-import consumer edges ───────────────────────────
# If file_a and file_b both import from the same internal module,
# functions in file_a and file_b are likely to co-change when that
# module changes. Create edges between functions across those files.
import_consumers: Dict[str, List[str]] = {}
for rel_file, imap in import_maps.items():
for _local_name, abs_path in imap.items():
# Only track internal (in-repo) imports
rel_imported = "./" + os.path.relpath(abs_path, repo_path)
if not rel_imported.startswith("./"):
continue
import_consumers.setdefault(rel_imported, []).append(rel_file)
# Deduplicate consumer file lists, then connect representatives
for _imported_mod, consumer_files in import_consumers.items():
consumer_files = list(dict.fromkeys(consumer_files)) # deduplicate, preserve order
if len(consumer_files) < 2 or len(consumer_files) > _SHARED_IMPORT_MAX_CONSUMERS:
continue
# Pick a representative symbol from each consumer file
representatives = []
for cfile in consumer_files:
rep_syms = file_groups.get(cfile, [])
if rep_syms:
representatives.append(rep_syms[0])
# Connect representatives pairwise
for i, a in enumerate(representatives):
for b in representatives[i + 1:]:
if b not in graph.get(a, []):
graph.setdefault(a, []).append(b)
if a not in graph.get(b, []):
graph.setdefault(b, []).append(a)
# ── Phase 1D: Same-directory sibling edges ────────────────────────────
# Files in the same package directory tend to co-change (tests ↔ impl,
# models ↔ serializers, etc.). Connect one representative per file to
# one representative from every other file in the same directory.
# Cap: skip directories with >_SAME_DIR_MAX_FILES Python files to avoid
# linking unrelated utility grab-bags.
dir_files: Dict[str, List[str]] = {}
for rel_file in file_groups:
dir_part = os.path.dirname(rel_file)
dir_files.setdefault(dir_part, []).append(rel_file)
for _dir, dir_file_list in dir_files.items():
if len(dir_file_list) < 2 or len(dir_file_list) > _SAME_DIR_MAX_FILES:
continue
reps = []
for df in dir_file_list:
syms = file_groups.get(df, [])
if syms:
reps.append(syms[0]) # one rep per file
for i, a in enumerate(reps):
for b in reps[i + 1:]:
if b not in graph.get(a, []):
graph.setdefault(a, []).append(b)
if a not in graph.get(b, []):
graph.setdefault(b, []).append(a)
# ── Phase 1E: Sliding-window within-file edges ────────────────────────
# Functions defined near each other in the same file are empirically the
# strongest co-change signal after direct calls. A sliding window of
# WINDOW_SIZE links each function to its closest neighbours in definition
# order. Cap: skip files with >FILE_WINDOW_MAX_SYMS symbols (e.g. a
# 600-function god-file would create O(n*w) noise edges).
WINDOW_SIZE = 3 # each function linked to ±3 neighbours
FILE_WINDOW_MAX_SYMS = 60 # skip window edges in very large files
for rel_file, syms_in_file in file_groups.items():
if len(syms_in_file) < 2 or len(syms_in_file) > FILE_WINDOW_MAX_SYMS:
continue
for i, a in enumerate(syms_in_file):
for j in range(i + 1, min(i + 1 + WINDOW_SIZE, len(syms_in_file))):
b = syms_in_file[j]
if b not in graph.get(a, []):
graph.setdefault(a, []).append(b)
if a not in graph.get(b, []):
graph.setdefault(b, []).append(a)
# ── Phase 1F: Light parent→child inheritance edges ────────────────────
# Child→parent already exists (Phase 1A). For parents with FEW children
# (≤PARENT_CHILD_MAX_CHILDREN), also add parent→child so that a change
# to the parent method surfaces its direct overriders. We skip parents
# with many children to avoid creating mega-hubs (e.g. BaseModel in
# pydantic has 400+ subclasses — those would destroy ranking).
PARENT_CHILD_MAX_CHILDREN = 8
# Build a map: parent_method_id -> [child_method_ids]
parent_to_children: Dict[str, List[str]] = {}
for child_key, bases in inheritance.items():
child_file, child_class = child_key.split(":", 1)
for qualifier, base_name in bases:
base_owner = _resolve_owner_type(
qualifier, base_name, child_file,
import_maps.get(child_file, {}),
class_registry, factory_returns, import_maps, repo_path,
classes_by_file=classes_by_file,
)
if not base_owner:
continue
child_prefix = f"{child_key}."
for fid in function_ids:
if not fid.startswith(child_prefix):
continue
method_name = fid[len(child_prefix):]
parent_method = f"{base_owner}.{method_name}"
if parent_method in function_ids and parent_method != fid:
parent_to_children.setdefault(parent_method, []).append(fid)
for parent_method, children in parent_to_children.items():
if len(children) > PARENT_CHILD_MAX_CHILDREN:
continue # too many — would create a mega-hub
for child_fid in children:
if child_fid not in graph.get(parent_method, []):
graph.setdefault(parent_method, []).append(child_fid)
return graph
# ── Internal helpers ──────────────────────────────────────────────────────
def _collect_function_nodes(tree):
"""
Return list of (function_node, is_method, class_name) for EVERY function
in the file, including nested functions and closures.
Mirrors what parser.py's _FunctionCollector does so that the graph covers
every symbol the parser emits (previously missed ~30-40% of nodes).
"""
result = []
_collect_recursive(tree.body, class_stack=[], result=result)
return result
def _collect_recursive(stmts, class_stack, result):
"""Recursively collect function nodes from a list of statements."""
for node in stmts:
if isinstance(node, ast.ClassDef):
class_stack.append(node.name)
_collect_recursive(node.body, class_stack, result)
class_stack.pop()
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
if class_stack:
class_name = ".".join(class_stack)
is_method = True
else:
class_name = None
is_method = False
result.append((node, is_method, class_name))
# Also recurse into the function body to catch closures/nested funcs
_collect_recursive(node.body, class_stack, result)
def _find_return_type(node):
found = None
for stmt in _iter_statements(node.body):
if isinstance(stmt, ast.Return) and stmt.value is not None:
if not isinstance(stmt.value, ast.Call):
return None
func = stmt.value.func
if isinstance(func, ast.Name):
ref = (None, func.id)
elif isinstance(func, ast.Attribute) and isinstance(func.value, ast.Name):
ref = (func.value.id, func.attr)
elif isinstance(func, ast.Attribute):
ref = (None, func.attr)
else:
return None
if found is None:
found = ref
elif found != ref:
return None
return found
def _find_annotated_return(node):
"""
Extract (qualifier, bare_name) from a PEP-3107 return annotation.
def f() -> MyClass: ... -> (None, "MyClass")
def f() -> module.MyClass: ... -> ("module", "MyClass")
Skips primitive annotations (str, int, bool, None, etc.) and generics
like List[X] where the outer type is not a class we own.
"""
PRIMITIVES = frozenset({
"str", "int", "float", "bool", "bytes", "None",
"list", "dict", "set", "tuple", "Any", "Optional",
"List", "Dict", "Set", "Tuple", "Iterator", "Generator",
"Iterable", "Sequence", "Mapping", "Type", "Union",
})
ann = node.returns
if ann is None:
return None
if isinstance(ann, ast.Constant):
return None
if isinstance(ann, ast.Name):
if ann.id in PRIMITIVES:
return None
return (None, ann.id)
if isinstance(ann, ast.Attribute) and isinstance(ann.value, ast.Name):
if ann.attr in PRIMITIVES:
return None
return (ann.value.id, ann.attr)
# Subscript like Optional[Router] — unwrap one level
if isinstance(ann, ast.Subscript):
return _find_annotated_return(
type("_Stub", (), {"returns": ann.slice})() # type: ignore[arg-type]
)
return None
def _resolve_owner_type(
qualifier, bare_name, relative_file, import_map,
class_registry, factory_returns, import_maps, repo_path, _seen=None,
classes_by_file=None,
):
if _seen is None:
_seen = set()
cache_key = (qualifier, bare_name, relative_file)
if cache_key in _seen:
return None
_seen.add(cache_key)
if qualifier:
if qualifier not in import_map:
return None
target_file = "./" + os.path.relpath(import_map[qualifier], repo_path)
if bare_name in class_registry:
if target_file in class_registry[bare_name]:
return f"{target_file}:{bare_name}"
if target_file.endswith("__init__.py"):
target_dir = target_file[:-12]
for cand_file in class_registry[bare_name]:
if cand_file.startswith(target_dir + "/"):
return f"{cand_file}:{bare_name}"
factory_key = f"{target_file}:{bare_name}"
if factory_key in factory_returns:
return _resolve_owner_type(
*factory_returns[factory_key], target_file,
import_maps.get(target_file, {}), class_registry,
factory_returns, import_maps, repo_path, _seen,
classes_by_file,
)
return None
# bare name
if bare_name in class_registry and relative_file in class_registry[bare_name]:
return f"{relative_file}:{bare_name}"
if bare_name in import_map:
target_file = "./" + os.path.relpath(import_map[bare_name], repo_path)
if bare_name in class_registry:
if target_file in class_registry[bare_name]:
return f"{target_file}:{bare_name}"
if target_file.endswith("__init__.py"):
target_dir = target_file[:-12]
for cand_file in class_registry[bare_name]:
if cand_file.startswith(target_dir + "/"):
return f"{cand_file}:{bare_name}"
elif classes_by_file:
file_classes = classes_by_file.get(target_file, [])
if len(file_classes) == 1:
return f"{target_file}:{file_classes[0]}"
factory_key = f"{target_file}:{bare_name}"
if factory_key in factory_returns:
return _resolve_owner_type(
*factory_returns[factory_key], target_file,
import_maps.get(target_file, {}), class_registry,
factory_returns, import_maps, repo_path, _seen,
classes_by_file,
)
return None
factory_key = f"{relative_file}:{bare_name}"
if factory_key in factory_returns:
return _resolve_owner_type(
*factory_returns[factory_key], relative_file,
import_map, class_registry, factory_returns,
import_maps, repo_path, _seen,
classes_by_file=classes_by_file,
)
return None
def _resolve_owner_of_expr(
node, class_name, relative_file, is_method, attribute_owners,
local_var_types=None, import_map=None, class_registry=None,
factory_returns=None, import_maps=None, repo_path=None,
classes_by_file=None, _cached_resolve=None,
):
if isinstance(node, ast.Name) and node.id == "self" and is_method and class_name:
return f"{relative_file}:{class_name}"
if (
isinstance(node, ast.Name)
and local_var_types
and node.id in local_var_types
and import_map is not None
):
qualifier, bare_name = local_var_types[node.id]
if _cached_resolve:
return _cached_resolve(qualifier, bare_name, relative_file, import_map)
return _resolve_owner_type(
qualifier, bare_name, relative_file, import_map,
class_registry or {}, factory_returns or {},
import_maps or {}, repo_path or "",
classes_by_file=classes_by_file,
)
# Module-level variable fallback: `app = Flask()` at module scope.
# attribute_owners stores these as "{rel_file}:{var_name}".
if isinstance(node, ast.Name) and attribute_owners:
module_key = f"{relative_file}:{node.id}"
if module_key in attribute_owners:
return attribute_owners[module_key]
if isinstance(node, ast.Attribute):
base_owner = _resolve_owner_of_expr(
node.value, class_name, relative_file, is_method, attribute_owners,
local_var_types, import_map, class_registry,
factory_returns, import_maps, repo_path,
classes_by_file, _cached_resolve,
)
if base_owner is None:
return None
return attribute_owners.get(f"{base_owner}.{node.attr}")
return None
def _resolve_via_inheritance(
owner_type, method_name, inheritance, class_registry,
import_maps, function_ids, repo_path, _seen=None,
classes_by_file=None,
):
if _seen is None:
_seen = set()
if owner_type in _seen:
return None
_seen.add(owner_type)
rel_file, class_name = owner_type.split(":", 1)
import_map = import_maps.get(rel_file, {})
for qualifier, base_name in inheritance.get(owner_type, []):
base_owner = _resolve_owner_type(
qualifier, base_name, rel_file, import_map,
class_registry, {}, import_maps, repo_path,
classes_by_file=classes_by_file,
)
if not base_owner:
continue
candidate = f"{base_owner}.{method_name}"
if candidate in function_ids:
return candidate
deeper = _resolve_via_inheritance(
base_owner, method_name, inheritance, class_registry,
import_maps, function_ids, repo_path, _seen,
classes_by_file,
)
if deeper:
return deeper
return None
def _resolve_call(
call_node, is_method, class_name, relative_file,
local_name_to_id, import_map, all_functions, function_ids,
repo_path, attribute_owners, inheritance, class_registry,
factory_returns, import_maps, local_var_types=None,
classes_by_file=None, _cached_resolve=None,
):
func = call_node.func
if isinstance(func, ast.Name):
return _lookup(func.id, local_name_to_id, import_map, all_functions, repo_path)
if isinstance(func, ast.Attribute):
method_name = func.attr
owner_type = _resolve_owner_of_expr(
func.value, class_name, relative_file, is_method, attribute_owners,
local_var_types, import_map, class_registry,
factory_returns, import_maps, repo_path,
classes_by_file, _cached_resolve,
)
if owner_type:
candidate = f"{owner_type}.{method_name}"
if candidate in function_ids:
return candidate
resolved = _resolve_via_inheritance(
owner_type, method_name, inheritance, class_registry,
import_maps, function_ids, repo_path,
classes_by_file=classes_by_file,
)
if resolved:
return resolved
return None
if isinstance(func.value, ast.Name) and func.value.id in import_map:
source_file = import_map[func.value.id]
relative_source = "./" + os.path.relpath(source_file, repo_path)
candidate = f"{relative_source}:{method_name}"
if candidate in all_functions:
return candidate
return None
def _lookup(called_name, local_name_to_id, import_map, all_functions, repo_path):
if called_name in local_name_to_id:
return local_name_to_id[called_name]
if called_name in import_map:
source_file = import_map[called_name]
relative_source = "./" + os.path.relpath(source_file, repo_path)
candidate = f"{relative_source}:{called_name}"
if candidate in all_functions:
return candidate
return None