Source code for dynamo.preprocessing.utils

from anndata import AnnData
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy.sparse import issparse, csr_matrix
from sklearn.decomposition import PCA, TruncatedSVD

import warnings
import anndata
from typing import Iterable, Union
from ..dynamo_logger import LoggerManager, main_debug, main_info, main_warning, main_exception
from ..utils import areinstance
from ..configuration import DKM, DynamoAdataKeyManager


# ---------------------------------------------------------------------------------------------------
# symbol conversion related
def convert2gene_symbol(input_names, scopes="ensembl.gene"):
    """Convert ensemble gene id to official gene names using mygene package.

    Parameters
    ----------
        input_names: list-like
            The ensemble gene id names that you want to convert to official gene names. All names should come from the
            same species.
        scopes: `list-like` or `None` (default: `None`)
            Scopes are needed when you use non-official gene name as your gene indices (or adata.var_name). This
            arugument corresponds to type of types of identifiers, either a list or a comma-separated fields to specify
            type of input qterms, e.g. “entrezgene”, “entrezgene,symbol”, [“ensemblgene”, “symbol”]. Refer to official
            MyGene.info docs (https://docs.mygene.info/en/latest/doc/query_service.html#available_fields) for full list
            of fields.

    Returns
    -------
        var_pd: `pd.Dataframe`
            A pandas dataframe that includes the following columns:
            query: the input ensmble ids
            _id: identified id from mygene
            _score: confidence of the retrieved official gene name.
            symbol: retrieved official gene name
    """

    try:
        import mygene
    except ImportError:
        raise ImportError(
            "You need to install the package `mygene` (pip install mygene --user) "
            "See https://pypi.org/project/mygene/ for more details."
        )

    mg = mygene.MyGeneInfo()
    main_info("Storing myGene name info into local cache db: mygene_cache.sqlite.")
    mg.set_caching()

    ensemble_names = [i.split(".")[0] for i in input_names]
    var_pd = mg.querymany(
        ensemble_names,
        scopes=scopes,
        fields="symbol",
        as_dataframe=True,
        df_index=True,
    )
    # var_pd.drop_duplicates(subset='query', inplace=True) # use when df_index is not True
    var_pd = var_pd.loc[~var_pd.index.duplicated(keep="first")]

    return var_pd


[docs]def convert2symbol(adata: AnnData, scopes: Union[str, Iterable, None] = None, subset=True): """This helper function converts unofficial gene names to official gene names.""" if np.all(adata.var_names.str.startswith("ENS")) or scopes is not None: logger = LoggerManager.gen_logger("dynamo-utils") logger.info("convert ensemble name to official gene name", indent_level=1) prefix = adata.var_names[0] if scopes is None: if prefix[:4] == "ENSG" or prefix[:7] == "ENSMUSG": scopes = "ensembl.gene" elif prefix[:4] == "ENST" or prefix[:7] == "ENSMUST": scopes = "ensembl.transcript" else: raise Exception( "Your adata object uses non-official gene names as gene index. \n" "Dynamo finds those IDs are neither from ensembl.gene or ensembl.transcript and thus cannot " "convert them automatically. \n" "Please pass the correct scopes or first convert the ensemble ID to gene short name " "(for example, using mygene package). \n" "See also dyn.pp.convert2gene_symbol" ) adata.var["query"] = [i.split(".")[0] for i in adata.var.index] if scopes is str: adata.var[scopes] = adata.var.index else: adata.var["scopes"] = adata.var.index logger.warning( "Your adata object uses non-official gene names as gene index. \n" "Dynamo is converting those names to official gene names." ) official_gene_df = convert2gene_symbol(adata.var_names, scopes) merge_df = adata.var.merge(official_gene_df, left_on="query", right_on="query", how="left").set_index( adata.var.index ) adata.var = merge_df valid_ind = np.where(merge_df["notfound"] != True)[0] # noqa: E712 if subset is True: adata._inplace_subset_var(valid_ind) adata.var.index = adata.var["symbol"].values.copy() else: indices = np.array(adata.var.index) indices[valid_ind] = adata.var.loc[valid_ind, "symbol"].values.copy() adata.var.index = indices if np.sum(adata.var_names.isnull()) > 0: main_info( "Subsetting adata object and removing Nan columns from adata when converting gene names.", indent_level=1, ) adata._inplace_subset_var(adata.var_names.notnull()) return adata
def compute_gene_exp_fraction(X, threshold=0.001): """Calculate fraction of each gene's count to total counts across cells and identify high fraction genes.""" frac = X.sum(0) / X.sum() if issparse(X): frac = frac.A.reshape(-1, 1) valid_ids = np.where(frac > threshold)[0] return frac, valid_ids # --------------------------------------------------------------------------------------------------- # implmentation of Cooks' distance (but this is for Poisson distribution fitting) # https://stackoverflow.com/questions/47686227/poisson-regression-in-statsmodels-and-r # from __future__ import division, print_function # https://stats.stackexchange.com/questions/356053/the-identity-link-function-does-not-respect-the-domain-of-the-gamma- # family def _weight_matrix(fitted_model): """Calculates weight matrix in Poisson regression Parameters ---------- fitted_model : statsmodel object Fitted Poisson model Returns ------- W : 2d array-like Diagonal weight matrix in Poisson regression """ return np.diag(fitted_model.fittedvalues) def _hessian(X, W): """Hessian matrix calculated as -X'*W*X Parameters ---------- X : 2d array-like Matrix of covariates W : 2d array-like Weight matrix Returns ------- hessian : 2d array-like Hessian matrix """ return -np.dot(X.T, np.dot(W, X)) def _hat_matrix(X, W): """Calculate hat matrix = W^(1/2) * X * (X'*W*X)^(-1) * X'*W^(1/2) Parameters ---------- X : 2d array-like Matrix of covariates W : 2d array-like Diagonal weight matrix Returns ------- hat : 2d array-like Hat matrix """ # W^(1/2) Wsqrt = W ** (0.5) # (X'*W*X)^(-1) XtWX = -_hessian(X=X, W=W) XtWX_inv = np.linalg.inv(XtWX) # W^(1/2)*X WsqrtX = np.dot(Wsqrt, X) # X'*W^(1/2) XtWsqrt = np.dot(X.T, Wsqrt) return np.dot(WsqrtX, np.dot(XtWX_inv, XtWsqrt)) def cook_dist(model, X, good): # Weight matrix W = _weight_matrix(model) # Hat matrix H = _hat_matrix(X, W) hii = np.diag(H) # Diagonal values of hat matrix # fit.get_influence().hat_matrix_diag # Pearson residuals r = model.resid_pearson # Cook's distance (formula used by R = (res/(1 - hat))^2 * hat/(dispersion * p)) # Note: dispersion is 1 since we aren't modeling overdispersion resid = good.disp - model.predict(good) rss = np.sum(resid ** 2) MSE = rss / (good.shape[0] - 2) # use the formula from: https://www.mathworks.com/help/stats/cooks-distance.html cooks_d = r ** 2 / (2 * MSE) * hii / (1 - hii) ** 2 # (r / (1 - hii)) ** 2 * / (1 * 2) return cooks_d # --------------------------------------------------------------------------------------------------- # preprocess utilities
[docs]def filter_genes_by_pattern( adata: anndata.AnnData, patterns: tuple = ("MT-", "RPS", "RPL", "MRPS", "MRPL", "ERCC-"), drop_genes: bool = False, ) -> Union[list, None]: """Utility function to filter mitochondria, ribsome protein and ERCC spike-in genes, etc.""" logger = LoggerManager.gen_logger("dynamo-utils") matched_genes = pd.Series(adata.var_names).str.startswith(patterns).to_list() logger.info( "total matched genes is " + str(sum(matched_genes)), indent_level=1, ) if sum(matched_genes) > 0: if drop_genes: gene_bools = np.ones(adata.n_vars, dtype=bool) gene_bools[matched_genes] = False logger.info( "inplace subset matched genes ... ", indent_level=1, ) # let us ignore the `inplace` parameter in pandas.Categorical.remove_unused_categories warning. with warnings.catch_warnings(): warnings.simplefilter("ignore") adata._inplace_subset_var(gene_bools) logger.finish_progress(progress_name="filter_genes_by_pattern") return None else: logger.finish_progress(progress_name="filter_genes_by_pattern") return matched_genes
def basic_stats(adata): adata.obs["nGenes"], adata.obs["nCounts"] = np.array((adata.X > 0).sum(1)), np.array((adata.X).sum(1)) adata.var["nCells"], adata.var["nCounts"] = np.array((adata.X > 0).sum(0).T), np.array((adata.X).sum(0).T) mito_genes = adata.var_names.str.upper().str.startswith("MT-") if sum(mito_genes) > 0: try: adata.obs["pMito"] = np.array(adata.X[:, mito_genes].sum(1) / adata.obs["nCounts"].values.reshape((-1, 1))) except: # noqa E722 main_exception( "no mitochondria genes detected; looks like your var_names may be corrupted (i.e. " "include nan values). If you don't believe so, please report to us on github or " "via xqiu@wi.mit.edu" ) else: adata.obs["pMito"] = 0 def unique_var_obs_adata(adata): """Function to make the obs and var attribute's index unique""" adata.obs_names_make_unique() adata.var_names_make_unique() return adata def convert_layers2csr(adata): """Function to make the obs and var attribute's index unique""" for key in adata.layers.keys(): adata.layers[key] = csr_matrix(adata.layers[key]) if not issparse(adata.layers[key]) else adata.layers[key] return adata def merge_adata_attrs(adata_ori, adata, attr): def _merge_by_diff(origin_df: pd.DataFrame, diff_df: pd.DataFrame): _columns = set(diff_df.columns).difference(origin_df.columns) new_df = origin_df.merge(diff_df[_columns], how="left", left_index=True, right_index=True) return new_df.loc[origin_df.index, :] if attr == "var": adata_ori.var = _merge_by_diff(adata_ori.var, adata.var) elif attr == "obs": adata_ori.obs = _merge_by_diff(adata_ori.obs, adata.obs) return adata_ori def get_inrange_shared_counts_mask(adata, layers, min_shared_count, count_by="gene"): layers = list(set(layers).difference(["X", "matrix", "ambiguous", "spanning"])) # choose shared counts sum by row or columns based on type: `gene` or `cells` sum_dim_index = None ret_dim_index = None if count_by == "gene": sum_dim_index = 0 ret_dim_index = 1 elif count_by == "cells": sum_dim_index = 1 ret_dim_index = 0 else: raise ValueError("Not supported shared account type") if len(np.array(layers)) == 0: main_warning("No layers exist in adata, skipp filtering by shared counts") return np.repeat(True, adata.shape[ret_dim_index]) layers = np.array(layers)[~pd.DataFrame(layers)[0].str.startswith("X_").values] _nonzeros, _sum = None, None # TODO fix bug: when some layers are sparse and some others are not (mixed sparse and ndarray), if the first one happens to be sparse, # dimension mismatch error will be raised; if the first layer (layers[0]) is not sparse, then the following loop works fine. # also check if layers2csr() function works for layer in layers: main_debug(adata.layers[layer].shape) main_debug("layer: %s" % layer) if issparse(adata.layers[layers[0]]): main_debug("when sparse, layer type:" + str(type(adata.layers[layer]))) _nonzeros = adata.layers[layer] > 0 if _nonzeros is None else _nonzeros.multiply(adata.layers[layer] > 0) else: main_debug("when not sparse, layer type:" + str(type(adata.layers[layer]))) _nonzeros = adata.layers[layer] > 0 if _nonzeros is None else _nonzeros * (adata.layers[layer] > 0) for layer in layers: if issparse(adata.layers[layers[0]]): _sum = ( _nonzeros.multiply(adata.layers[layer]) if _sum is None else _sum + _nonzeros.multiply(adata.layers[layer]) ) else: _sum = ( np.multiply(_nonzeros, adata.layers[layer]) if _sum is None else _sum + np.multiply(_nonzeros, adata.layers[layer]) ) return ( np.array(_sum.sum(sum_dim_index).A1 >= min_shared_count) if issparse(adata.layers[layers[0]]) else np.array(_sum.sum(sum_dim_index) >= min_shared_count) ) def clusters_stats(U, S, clusters_uid, cluster_ix, size_limit=40): """Calculate the averages per cluster If the cluster is too small (size<size_limit) the average of the total is reported instead This function is modified from velocyto in order to reproduce velocyto's DentateGyrus notebook. """ U_avgs = np.zeros((S.shape[1], len(clusters_uid))) S_avgs = np.zeros((S.shape[1], len(clusters_uid))) # avgU_div_avgS = np.zeros((S.shape[1], len(clusters_uid))) # slopes_by_clust = np.zeros((S.shape[1], len(clusters_uid))) for i, uid in enumerate(clusters_uid): cluster_filter = cluster_ix == i n_cells = np.sum(cluster_filter) if n_cells > size_limit: U_avgs[:, i], S_avgs[:, i] = ( U[cluster_filter, :].mean(0), S[cluster_filter, :].mean(0), ) else: U_avgs[:, i], S_avgs[:, i] = U.mean(0), S.mean(0) return U_avgs, S_avgs def get_svr_filter(adata, layer="spliced", n_top_genes=3000, return_adata=False): score_name = "score" if layer in ["X", "all"] else layer + "_score" valid_idx = np.where(np.isfinite(adata.var.loc[:, score_name]))[0] valid_table = adata.var.iloc[valid_idx, :] nth_score = np.sort(valid_table.loc[:, score_name])[::-1][np.min((n_top_genes - 1, valid_table.shape[0] - 1))] feature_gene_idx = np.where(valid_table.loc[:, score_name] >= nth_score)[0][:n_top_genes] feature_gene_idx = valid_idx[feature_gene_idx] if return_adata: adata.var.loc[:, "use_for_pca"] = False adata.var.loc[adata.var.index[feature_gene_idx], "use_for_pca"] = True res = adata else: filter_bool = np.zeros(adata.n_vars, dtype=bool) filter_bool[feature_gene_idx] = True res = filter_bool return res def sz_util( adata, layer, round_exprs, method, locfunc, total_layers=None, CM=None, scale_to=None, ): adata = adata.copy() if layer == "_total_" and "_total_" not in adata.layers.keys(): if total_layers is not None: if not isinstance(total_layers, list): total_layers = [total_layers] if len(set(total_layers).difference(adata.layers.keys())) == 0: total = None for t_key in total_layers: total = adata.layers[t_key] if total is None else total + adata.layers[t_key] adata.layers["_total_"] = total if layer == "raw": CM = adata.raw.X if CM is None else CM elif layer == "X": CM = adata.X if CM is None else CM elif layer == "protein": if "protein" in adata.obsm_keys(): CM = adata.obsm["protein"] if CM is None else CM else: return None, None else: CM = adata.layers[layer] if CM is None else CM if round_exprs: main_info("rounding expression data of layer: %s during size factor calculation" % (layer)) if issparse(CM): CM.data = np.round(CM.data, 0) else: CM = CM.round().astype("int") cell_total = CM.sum(axis=1).A1 if issparse(CM) else CM.sum(axis=1) cell_total += cell_total == 0 # avoid infinity value after log (0) if method in ["mean-geometric-mean-total", "geometric"]: sfs = cell_total / (np.exp(locfunc(np.log(cell_total))) if scale_to is None else scale_to) elif method == "median": sfs = cell_total / (np.nanmedian(cell_total) if scale_to is None else scale_to) elif method == "mean": sfs = cell_total / (np.nanmean(cell_total) if scale_to is None else scale_to) else: raise NotImplementedError(f"This method {method} is not supported!") return sfs, cell_total def get_sz_exprs(adata, layer, total_szfactor=None): if layer == "raw": CM = adata.raw.X szfactors = adata.obs[layer + "Size_Factor"].values[:, None] elif layer == "X": CM = adata.X szfactors = adata.obs["Size_Factor"].values[:, None] elif layer == "protein": if "protein" in adata.obsm_keys(): CM = adata.obsm[layer] szfactors = adata.obs["protein_Size_Factor"].values[:, None] else: CM, szfactors = None, None else: CM = adata.layers[layer] szfactors = adata.obs[layer + "_Size_Factor"].values[:, None] if total_szfactor is not None and total_szfactor in adata.obs.keys(): szfactors = adata.obs[total_szfactor][:, None] elif total_szfactor is not None: main_warning("`total_szfactor` is not `None` and it is not in adata object.") return szfactors, CM def normalize_mat_monocle(mat, szfactors, relative_expr, pseudo_expr, norm_method=np.log1p): if norm_method == np.log1p: pseudo_expr = 0 if relative_expr: mat = mat.multiply(csr_matrix(1 / szfactors)) if issparse(mat) else mat / szfactors if pseudo_expr is None: pseudo_expr = 1 if issparse(mat): mat.data = norm_method(mat.data + pseudo_expr) if norm_method is not None else mat.data if norm_method is not None and norm_method.__name__ == "Freeman_Tukey": mat.data -= 1 else: mat = norm_method(mat + pseudo_expr) if norm_method is not None else mat return mat def Freeman_Tukey(X, inverse=False): if inverse: res = np.sqrt(X) + np.sqrt((X + 1)) else: res = (X ** 2 - 1) ** 2 / (4 * X ** 2) return res def anndata_bytestring_decode(adata_item): for key in adata_item.keys(): df = adata_item[key] if df.dtype.name == "category" and areinstance(df.cat.categories, bytes): cat = [c.decode() for c in df.cat.categories] df.cat.rename_categories(cat, inplace=True) def decode_index(adata_item): if areinstance(adata_item.index, bytes): index = {i: i.decode() for i in adata_item.index} adata_item.rename(index, inplace=True) def decode(adata): decode_index(adata.obs) decode_index(adata.var) anndata_bytestring_decode(adata.obs) anndata_bytestring_decode(adata.var) # --------------------------------------------------------------------------------------------------- # pca
[docs]def pca_monocle( adata: AnnData, X_data=None, n_pca_components: int = 30, pca_key: str = "X", pcs_key: str = "PCs", genes_to_append=None, layer: str = None, return_all=False, ): # only use genes pass filter (based on use_for_pca) to perform dimension reduction. if X_data is None: if "use_for_pca" not in adata.var.keys(): adata.var["use_for_pca"] = True if layer is None: X_data = adata.X[:, adata.var.use_for_pca.values] else: if "X" in layer: X_data = adata.X[:, adata.var.use_for_pca.values] elif "total" in layer: X_data = adata.layers["X_total"][:, adata.var.use_for_pca.values] elif "spliced" in layer: X_data = adata.layers["X_spliced"][:, adata.var.use_for_pca.values] elif "protein" in layer: X_data = adata.obsm["X_protein"] elif type(layer) is str: X_data = adata.layers["X_" + layer][:, adata.var.use_for_pca.values] else: raise ValueError( f"your input layer argument should be either a `str` or a list that includes one of `X`, " f"`total`, `protein` element. `Layer` currently is {layer}." ) cm_genesums = X_data.sum(axis=0) valid_ind = np.logical_and(np.isfinite(cm_genesums), cm_genesums != 0) valid_ind = np.array(valid_ind).flatten() bad_genes = np.where(adata.var.use_for_pca)[0][~valid_ind] if genes_to_append is not None and len(adata.var.index[bad_genes].intersection(genes_to_append)) > 0: raise ValueError( f"The gene list passed to argument genes_to_append contains genes with no expression " f"across cells or non finite values. Please check those genes:" f"{set(bad_genes).intersection(genes_to_append)}!" ) adata.var.iloc[bad_genes, adata.var.columns.tolist().index("use_for_pca")] = False X_data = X_data[:, valid_ind] USE_TRUNCATED_SVD_THRESHOLD = 100000 if adata.n_obs < USE_TRUNCATED_SVD_THRESHOLD: pca = PCA( n_components=min(n_pca_components, X_data.shape[1] - 1), svd_solver="arpack", random_state=0, ) fit = pca.fit(X_data.toarray()) if issparse(X_data) else pca.fit(X_data) X_pca = fit.transform(X_data.toarray()) if issparse(X_data) else fit.transform(X_data) adata.obsm[pca_key] = X_pca adata.uns[pcs_key] = fit.components_.T adata.uns["explained_variance_ratio_"] = fit.explained_variance_ratio_ else: # unscaled PCA fit = TruncatedSVD( n_components=min(n_pca_components + 1, X_data.shape[1] - 1), random_state=0, ) # first columns is related to the total UMI (or library size) X_pca = fit.fit_transform(X_data)[:, 1:] adata.obsm[pca_key] = X_pca adata.uns[pcs_key] = fit.components_.T adata.uns["explained_variance_ratio_"] = fit.explained_variance_ratio_[1:] adata.uns["explained_variance_ratio_"] = fit.explained_variance_ratio_[1:] adata.uns["pca_mean"] = fit.mean_ if hasattr(fit, "mean_") else None if return_all: return adata, fit, X_pca else: return adata
def pca_genes(PCs: list, n_top_genes: int = 100): """[summary] For each gene, if the gene is n_top in some principle component then it is valid. Return all such valid genes. Parameters ---------- PCs : principle components(PC) of PCA n_top_genes : number of gene candidates in EACH PC Returns ------- ret """ valid_genes = np.zeros(PCs.shape[0], dtype=bool) for q in PCs.T: sorted_q = np.sort(np.abs(q))[::-1] is_pc_top_n = np.abs(q) > sorted_q[n_top_genes] valid_genes = np.logical_or(is_pc_top_n, valid_genes) return valid_genes
[docs]def top_pca_genes(adata, pc_key="PCs", n_top_genes=100, pc_components=None, adata_store_key="top_pca_genes"): if pc_key in adata.uns.keys(): Q = adata.uns[pc_key] elif pc_key in adata.varm.keys(): Q = adata.varm[pc_key] else: raise Exception(f"No PC matrix {pc_key} found in neither .uns nor .varm.") if pc_components is not None: if type(pc_components) == int: Q = Q[:, :pc_components] elif type(pc_components) == list: Q = Q[:, pc_components] pcg = pca_genes(Q, n_top_genes=n_top_genes) genes = np.zeros(adata.n_vars, dtype=bool) if "use_for_pca" in adata.var.keys(): genes[adata.var["use_for_pca"]] = pcg else: genes = pcg adata.var[adata_store_key] = genes return adata
def add_noise_to_duplicates(adata, basis="pca"): X_data = adata.obsm["X_" + basis] min_val = abs(X_data).min() n_obs, n_var = X_data.shape while True: _, index = np.unique(X_data, axis=0, return_index=True) duplicated_idx = np.setdiff1d(np.arange(n_obs), index) if len(duplicated_idx) == 0: adata.obsm["X_" + basis] = X_data break else: X_data[duplicated_idx, :] += np.random.normal(0, min_val / 1000, (len(duplicated_idx), n_var)) # --------------------------------------------------------------------------------------------------- # labeling related def collapse_species_adata(adata): """Function to collapse the four species data, will be generalized to handle dual-datasets""" ( only_splicing, only_labeling, splicing_and_labeling, ) = DKM.allowed_layer_raw_names() if np.all([name in adata.layers.keys() for name in splicing_and_labeling]): if only_splicing[0] not in adata.layers.keys(): adata.layers[only_splicing[0]] = adata.layers["su"] + adata.layers["sl"] if only_splicing[1] not in adata.layers.keys(): adata.layers[only_splicing[1]] = adata.layers["uu"] + adata.layers["ul"] if only_labeling[0] not in adata.layers.keys(): adata.layers[only_labeling[0]] = adata.layers["ul"] + adata.layers["sl"] if only_labeling[1] not in adata.layers.keys(): adata.layers[only_labeling[1]] = adata.layers[only_labeling[0]] + adata.layers["uu"] + adata.layers["su"] return adata def detect_experiment_datatype(adata): has_splicing, has_labeling, splicing_labeling, has_protein = ( False, False, False, False, ) layers = adata.layers.keys() if ( len({"ul", "sl", "uu", "su"}.difference(layers)) == 0 or len({"X_ul", "X_sl", "X_uu", "X_su"}.difference(layers)) == 0 ): has_splicing, has_labeling, splicing_labeling = True, True, True elif ( len({"unspliced", "spliced", "new", "total"}.difference(layers)) == 0 or len({"X_unspliced", "X_spliced", "X_new", "X_total"}.difference(layers)) == 0 ): has_splicing, has_labeling = True, True elif ( len({"unspliced", "spliced"}.difference(layers)) == 0 or len({"X_unspliced", "X_spliced"}.difference(layers)) == 0 ): has_splicing = True elif len({"new", "total"}.difference(layers)) == 0 or len({"X_new", "X_total"}.difference(layers)) == 0: has_labeling = True if "protein" in adata.obsm.keys(): has_protein = True return has_splicing, has_labeling, splicing_labeling, has_protein def default_layer(adata): has_splicing, has_labeling, splicing_labeling, _ = detect_experiment_datatype(adata) if has_splicing: if has_labeling: if len(set(adata.layers.keys()).intersection(["new", "total", "spliced", "unspliced"])) == 4: adata = collapse_species_adata(adata) default_layer = ( "M_t" if "M_t" in adata.layers.keys() else "X_total" if "X_total" in adata.layers.keys() else "total" ) else: default_layer = ( "M_s" if "M_s" in adata.layers.keys() else "X_spliced" if "X_spliced" in adata.layers.keys() else "spliced" ) else: default_layer = ( "M_t" if "M_t" in adata.layers.keys() else "X_total" if "X_total" in adata.layers.keys() else "total" ) return default_layer def calc_new_to_total_ratio(adata): """calculate the new to total ratio across cells. Note that NTR for the first time point in degradation approximates gamma/beta.""" if len({"new", "total"}.intersection(adata.layers.keys())) == 2: ntr = adata.layers["new"].sum(1) / adata.layers["total"].sum(1) ntr = ntr.A1 if issparse(adata.layers["new"]) else ntr var_ntr = adata.layers["new"].sum(0) / adata.layers["total"].sum(0) var_ntr = var_ntr.A1 if issparse(adata.layers["new"]) else var_ntr elif len({"uu", "ul", "su", "sl"}.intersection(adata.layers.keys())) == 4: new = adata.layers["ul"].sum(1) + adata.layers["sl"].sum(1) total = new + adata.layers["uu"].sum(1) + adata.layers["su"].sum(1) ntr = new / total ntr = ntr.A1 if issparse(adata.layers["uu"]) else ntr new = adata.layers["ul"].sum(0) + adata.layers["sl"].sum(0) total = new + adata.layers["uu"].sum(0) + adata.layers["su"].sum(0) var_ntr = new / total var_ntr = var_ntr.A1 if issparse(adata.layers["uu"]) else var_ntr elif len({"unspliced", "spliced"}.intersection(adata.layers.keys())) == 2: with warnings.catch_warnings(): warnings.simplefilter("ignore") ntr = adata.layers["unspliced"].sum(1) / (adata.layers["unspliced"] + adata.layers["spliced"]).sum(1) var_ntr = adata.layers["unspliced"].sum(0) / (adata.layers["unspliced"] + adata.layers["spliced"]).sum(0) ntr = ntr.A1 if issparse(adata.layers["unspliced"]) else ntr var_ntr = var_ntr.A1 if issparse(adata.layers["unspliced"]) else var_ntr else: ntr, var_ntr = None, None return ntr, var_ntr
[docs]def scale(adata, layers=None, scale_to_layer=None, scale_to=1e6): """scale layers to a particular total expression value, similar to `normalize_expr_data` function.""" layers = DynamoAdataKeyManager.get_available_layer_keys(adata, layers) has_splicing, has_labeling, _ = detect_experiment_datatype(adata) if scale_to_layer is None: scale_to_layer = "total" if has_labeling else None scale = scale_to / adata.layers[scale_to_layer].sum(1) else: scale = None for layer in layers: if scale is None: scale = scale_to / adata.layers[layer].sum(1) adata.layers[layer] = csr_matrix(adata.layers[layer].multiply(scale)) return adata
# --------------------------------------------------------------------------------------------------- # ERCC related def relative2abs( adata, dilution, volume, from_layer=None, to_layers=None, mixture_type=1, ERCC_controls=None, ERCC_annotation=None, ): """Converts FPKM/TPM data to transcript counts using ERCC spike-in. This is based on the relative2abs function from monocle 2 (Qiu, et. al, Nature Methods, 2017). Parameters ---------- adata: :class:`~anndata.AnnData` an Annodata object dilution: `float` the dilution of the spikein transcript in the lysis reaction mix. Default is 40, 000. The number of spike-in transcripts per single-cell lysis reaction was calculated from. volume: `float` the approximate volume of the lysis chamber (nanoliters). Default is 10 from_layer: `str` or `None` The layer in which the ERCC TPM values will be used as the covariate for the ERCC based linear regression. to_layers: `str`, `None` or `list-like` The layers that our ERCC based transformation will be applied to. mixture_type: the type of spikein transcripts from the spikein mixture added in the experiments. By default, it is mixture 1. Note that m/c we inferred are also based on mixture 1. ERCC_controls: the FPKM/TPM matrix for each ERCC spike-in transcript in the cells if user wants to perform the transformation based on their spike-in data. Note that the row and column names should match up with the ERCC_annotation and relative_exprs_matrix respectively. ERCC_annotation: the ERCC_annotation matrix from illumina USE GUIDE which will be ued for calculating the ERCC transcript copy number for performing the transformation. Returns ------- An adata object with the data specified in the to_layers transformed into absolute counts. """ if ERCC_annotation is None: ERCC_annotation = pd.read_csv( "https://www.dropbox.com/s/cmiuthdw5tt76o5/ERCC_specification.txt?dl=1", sep="\t", ) ERCC_id = ERCC_annotation["ERCC ID"] ERCC_id = adata.var_names.intersection(ERCC_id) if len(ERCC_id) < 10 and ERCC_controls is None: raise Exception("The adata object you provided has less than 10 ERCC genes.") if to_layers is not None: to_layers = [to_layers] if to_layers is str else to_layers to_layers = list(set(adata.layers.keys()).intersection(to_layers)) if len(to_layers) == 0: raise Exception( f"The layers {to_layers} that will be converted to absolute counts doesn't match any layers" f"from the adata object." ) mixture_name = ( "concentration in Mix 1 (attomoles/ul)" if mixture_type == 1 else "concentration in Mix 2 (attomoles/ul)" ) ERCC_annotation["numMolecules"] = ERCC_annotation.loc[:, mixture_name] * ( volume * 10 ** (-3) * 1 / dilution * 10 ** (-18) * 6.02214129 * 10 ** (23) ) ERCC_annotation["rounded_numMolecules"] = ERCC_annotation["numMolecules"].astype(int) if from_layer in [None, "X"]: X, X_ercc = ( adata.X, adata[:, ERCC_id].X if ERCC_controls is None else ERCC_controls, ) else: X, X_ercc = ( adata.layers[from_layer], adata[:, ERCC_id] if ERCC_controls is None else ERCC_controls, ) logged = False if X.max() > 100 else True if not logged: X, X_ercc = ( np.log1p(X.A) if issparse(X_ercc) else np.log1p(X), np.log1p(X_ercc.A) if issparse(X_ercc) else np.log1p(X_ercc), ) else: X, X_ercc = ( X.A if issparse(X_ercc) else X, X_ercc.A if issparse(X_ercc) else X_ercc, ) y = np.log1p(ERCC_annotation["numMolecules"]) for i in range(adata.n_obs): X_i, X_ercc_i = X[i, :], X_ercc[i, :] X_i, X_ercc_i = sm.add_constant(X_i), sm.add_constant(X_ercc_i) res = sm.RLM(y, X_ercc_i).fit() k, b = res.params[::-1] if to_layers is None: X = adata.X logged = False if X.max() > 100 else True if not logged: X_i = np.log1p(X[i, :].A) if issparse(X) else np.log1p(X[i, :]) else: X_i = X[i, :].A if issparse(X) else X[i, :] res = k * X_i + b res = res if logged else np.expm1(res) adata.X[i, :] = csr_matrix(res) if issparse(X) else res else: for cur_layer in to_layers: X = adata.layers[cur_layer] logged = False if X.max() > 100 else True if not logged: X_i = np.log1p(X[i, :].A) if issparse(X) else np.log1p(X[i, :]) else: X_i = X[i, :].A if issparse(X) else X[i, :] res = k * X_i + b if logged else np.expm1(k * X_i + b) adata.layers[cur_layer][i, :] = csr_matrix(res) if issparse(X) else res # --------------------------------------------------------------------------------------------------- # coordinate/vector space operations def affine_transform(X, A, b): X = np.array(X) A = np.array(A) b = np.array(b) return (A @ X.T).T + b def gen_rotation_2d(degree: float): from math import cos, sin, radians rad = radians(degree) R = [ [cos(rad), -sin(rad)], [sin(rad), cos(rad)], ] return np.array(R)