| import scanpy as sc |
| import numpy as np |
| import pandas as pd |
| import json |
| import os |
| from PIL import Image |
|
|
|
|
|
|
| def generate_gene_df(ad, house_keeping_genes, todense=True): |
| """ |
| Generates a DataFrame with the top 50 genes for each observation in an AnnData object. |
| It removes genes containing '.' or '-' in their names, as well as genes listed in |
| the provided `house_keeping_genes` DataFrame/Series under the 'genesymbol' column. |
| |
| :param ad: An AnnData object containing gene expression data. |
| :type ad: anndata.AnnData |
| :param house_keeping_genes: DataFrame or Series with a 'genesymbol' column listing housekeeping genes to exclude. |
| :type house_keeping_genes: pandas.DataFrame or pandas.Series |
| :param todense: Whether to convert the sparse matrix (ad.X) to a dense matrix before creating a DataFrame. |
| :type todense: bool |
| :return: A DataFrame (`top_k_genes_str`) that contains a 'label' column. Each row in 'label' is a string |
| with the top 50 gene names (space-separated) for that observation. |
| :rtype: pandas.DataFrame |
| """ |
|
|
| |
| ad = ad[:, ~ad.var.index.str.contains('.', regex=False)] |
| |
| ad = ad[:, ~ad.var.index.str.contains('-', regex=False)] |
| |
| ad = ad[:, ~ad.var.index.isin(house_keeping_genes['genesymbol'])] |
|
|
| |
| if todense: |
| expr = pd.DataFrame(ad.X.todense(), index=ad.obs.index, columns=ad.var.index) |
| else: |
| expr = pd.DataFrame(ad.X, index=ad.obs.index, columns=ad.var.index) |
|
|
| |
| top_k_genes = expr.apply(lambda s, n: pd.Series(s.nlargest(n).index), axis=1, n=50) |
|
|
| |
| top_k_genes_str = pd.DataFrame() |
| top_k_genes_str['label'] = top_k_genes[top_k_genes.columns].astype(str) \ |
| .apply(lambda x: ' '.join(x), axis=1) |
|
|
| return top_k_genes_str |
|
|
|
|
|
|
| def segment_patches(img_array, coord, patch_dir, height=20, width=20): |
| """ |
| Extracts small image patches centered at specified coordinates and saves them as individual PNG files. |
| |
| :param img_array: A NumPy array representing the full-resolution image. Shape is expected to be (H, W[, C]). |
| :type img_array: numpy.ndarray |
| :param coord: A pandas DataFrame containing patch center coordinates in columns "pixel_x" and "pixel_y". |
| The index corresponds to spot IDs. Example columns: ["pixel_x", "pixel_y"]. |
| :type coord: pandas.DataFrame |
| :param patch_dir: Directory path where the patch images will be saved. |
| :type patch_dir: str |
| :param height: The patch's height in pixels (distance in the y-direction). |
| :type height: int |
| :param width: The patch's width in pixels (distance in the x-direction). |
| :type width: int |
| :return: None. The function saves image patches to `patch_dir` but does not return anything. |
| """ |
|
|
| |
| if not os.path.exists(patch_dir): |
| os.makedirs(patch_dir) |
|
|
| |
| yrange, xrange = img_array.shape[:2] |
|
|
| |
| for spot_idx in coord.index: |
| |
| ycenter, xcenter = coord.loc[spot_idx, ["pixel_x", "pixel_y"]] |
|
|
| |
| x1 = round(xcenter - width / 2) |
| y1 = round(ycenter - height / 2) |
| x2 = x1 + width |
| y2 = y1 + height |
|
|
| |
| if x1 < 0 or y1 < 0 or x2 > xrange or y2 > yrange: |
| print(f"Patch {spot_idx} is out of range and will be skipped.") |
| continue |
|
|
| |
| patch_img = Image.fromarray(img_array[y1:y2, x1:x2].astype(np.uint8)) |
|
|
| |
| patch_name = f"{spot_idx}_hires.png" |
| patch_path = os.path.join(patch_dir, patch_name) |
|
|
| |
| patch_img.save(patch_path) |
|
|
|
|
|
|
| def read_gct(file_path): |
| """ |
| Reads a GCT file, parses its dimensions, and returns the data as a pandas DataFrame. |
| |
| :param file_path: The path to the GCT file to be read. |
| :return: A pandas DataFrame containing the GCT data, where the first two columns represent gene names and descriptions, |
| and the subsequent columns contain the expression data. |
| """ |
| |
| |
| with open(file_path, 'r') as file: |
| |
| file.readline() |
| |
| |
| dims = file.readline().strip().split() |
| num_rows = int(dims[0]) |
| num_cols = int(dims[1]) |
| |
| |
| |
| data = pd.read_csv(file, sep='\t', header=0, nrows=num_rows) |
| |
| |
| return data |
|
|
|
|
|
|
| def get_library_id(adata): |
| """ |
| Retrieves the library ID from the AnnData object, assuming it contains spatial data. |
| The function will return the first library ID found in `adata.uns['spatial']`. |
| |
| :param adata: AnnData object containing spatial information in `adata.uns['spatial']`. |
| :return: The first library ID found in `adata.uns['spatial']`. |
| :raises: |
| AssertionError: If 'spatial' is not present in `adata.uns`. |
| Logs an error if no library ID is found. |
| """ |
| |
| |
| assert 'spatial' in adata.uns, "spatial not present in adata.uns" |
| |
| |
| library_ids = adata.uns['spatial'].keys() |
| |
| try: |
| |
| library_id = list(library_ids)[0] |
| return library_id |
| except IndexError: |
| |
| logger.error('No library_id found in adata') |
|
|
|
|
|
|
| def get_scalefactors(adata, library_id=None): |
| """ |
| Retrieves the scalefactors from the AnnData object for a given library ID. If no library ID is provided, |
| the function will automatically retrieve the first available library ID. |
| |
| :param adata: AnnData object containing spatial data and scalefactors in `adata.uns['spatial']`. |
| :param library_id: The library ID for which the scalefactors are to be retrieved. If not provided, it defaults to the first available ID. |
| :return: A dictionary containing scalefactors for the specified library ID. |
| """ |
| |
| |
| if library_id is None: |
| library_id = get_library_id(adata) |
| |
| try: |
| |
| scalef = adata.uns['spatial'][library_id]['scalefactors'] |
| return scalef |
| except KeyError: |
| |
| logger.error('scalefactors not found in adata') |
|
|
|
|
|
|
| def get_spot_diameter_in_pixels(adata, library_id=None): |
| """ |
| Retrieves the spot diameter in pixels from the AnnData object's scalefactors for a given library ID. |
| If no library ID is provided, the function will automatically retrieve the first available library ID. |
| |
| :param adata: AnnData object containing spatial data and scalefactors in `adata.uns['spatial']`. |
| :param library_id: The library ID for which the spot diameter is to be retrieved. If not provided, defaults to the first available ID. |
| |
| :return: The spot diameter in full resolution pixels, or None if not found. |
| """ |
| |
| |
| scalef = get_scalefactors(adata, library_id=library_id) |
| |
| try: |
| |
| spot_diameter = scalef['spot_diameter_fullres'] |
| return spot_diameter |
| except TypeError: |
| |
| pass |
| except KeyError: |
| |
| logger.error('spot_diameter_fullres not found in adata') |
|
|
|
|
|
|
| def prepare_data_for_alignment(data_path, scale_type='tissue_hires_scalef'): |
| """ |
| Prepares data for alignment by reading an AnnData object and preparing the high-resolution tissue image. |
| |
| :param data_path: The path to the AnnData (.h5ad) file containing the Visium data. |
| :param scale_type: The type of scale factor to use (`tissue_hires_scalef` by default). |
| |
| :return: |
| - ad: AnnData object containing the spatial transcriptomics data. |
| - ad_coor: Numpy array of scaled spatial coordinates (adjusted for the specified resolution). |
| - img: High-resolution tissue image, normalized to 8-bit unsigned integers. |
| |
| :raises: |
| ValueError: If required data (e.g., scale factors, spatial coordinates, or images) is missing. |
| """ |
| |
| |
| ad = sc.read_h5ad(data_path) |
| |
| |
| ad.var_names_make_unique() |
| |
| try: |
| |
| scalef = get_scalefactors(ad)[scale_type] |
| except KeyError: |
| raise ValueError(f"Scale factor '{scale_type}' not found in ad.uns['spatial']") |
| |
| |
| try: |
| ad_coor = np.array(ad.obsm['spatial']) * scalef |
| except KeyError: |
| raise ValueError("Spatial coordinates not found in ad.obsm['spatial']") |
| |
| |
| try: |
| img = ad.uns['spatial'][get_library_id(ad)]['images']['hires'] |
| except KeyError: |
| raise ValueError("High-resolution image not found in ad.uns['spatial']") |
| |
| |
| if img.max() < 1.1: |
| img = (img * 255).astype('uint8') |
| |
| return ad, ad_coor, img |
|
|
|
|
|
|
| def load_data_for_annotation(st_data_path, json_path, in_tissue=True): |
| """ |
| Loads spatial transcriptomics (ST) data from an .h5ad file and prepares it for annotation. |
| |
| :param sample_type: The type or category of the sample (used to locate the data in the directory structure). |
| :param sample_name: The name of the sample (used to locate specific files). |
| :param in_tissue: Boolean flag to filter the data to include only spots that are in tissue. Default is True. |
| |
| :return: |
| - st_ad: AnnData object containing the spatial transcriptomics data, with spatial coordinates in `obs`. |
| - library_id: The library ID associated with the spatial data. |
| - roi_polygon: Region of interest polygon loaded from a JSON file for further annotation or analysis. |
| """ |
|
|
| |
| st_ad = sc.read_h5ad(st_data_path) |
| |
| |
| if in_tissue: |
| st_ad = st_ad[st_ad.obs['in_tissue'] == 1] |
| |
| |
| st_ad.obs[["pixel_y", "pixel_x"]] = None |
| st_ad.obs[["pixel_y", "pixel_x"]] = st_ad.obsm['spatial'] |
| |
| |
| library_id = get_library_id(st_ad) |
| |
| |
| with open(json_path) as f: |
| roi_polygon = json.load(f) |
|
|
| return st_ad, library_id, roi_polygon |
|
|
|
|
|
|
| def read_polygons(file_path, slide_id): |
| """ |
| Reads polygon data from a JSON file for a specific slide ID, extracting coordinates, colors, and thickness. |
| |
| :param file_path: Path to the JSON file containing polygon configurations. |
| :param slide_id: Identifier for the specific slide whose polygon data is to be extracted. |
| :return: |
| - polygons: A list of numpy arrays, where each array contains the coordinates of a polygon. |
| - polygon_colors: A list of color values corresponding to each polygon. |
| - polygon_thickness: A list of thickness values for each polygon's border. |
| """ |
|
|
| |
| with open(file_path, 'r') as f: |
| polygons_configs = json.load(f) |
|
|
| |
| if slide_id not in polygons_configs: |
| return None, None, None |
|
|
| |
| polygons = [np.array(poly['coords']) for poly in polygons_configs[slide_id]] |
| polygon_colors = [poly['color'] for poly in polygons_configs[slide_id]] |
| polygon_thickness = [poly['thickness'] for poly in polygons_configs[slide_id]] |
|
|
| |
| return polygons, polygon_colors, polygon_thickness |
|
|
|
|
|
|