Source code for sincei.TopicModels

# topic models
import numpy as np
import pandas as pd
from gensim import corpora, matutils, models
import copy
from sklearn.preprocessing import binarize

### ------ Functions ------


[docs] class TOPICMODEL: r""" Computes LSA or LDA for a given matrix and returns the cell-topic matrix. Parameters ---------- adata : AnnData AnnData object containing the data matrix in adata.X, with cells in adata.obs_names and regions in adata.var_names. n_topics : int Number of Topics / Principal Components for modeling. binarize : bool, optional If True, the input matrix will be binarized (default is False). Recommended for LDA. smart_code : str SMART (System for the Mechanical Analysis and Retrieval of Text) code for weighting of input matrix for TFIDF. Only valid for the LSA model. The default ("lfu") corresponds to "log"TF * IDF, and "pivoted unique" normalization of document length. For more information, see: https://en.wikipedia.org/wiki/SMART_Information_Retrieval_System n_passes : int, optional Number of passes for the LDA model. Default is 1. n_workers : int, optional Number of workers for the LDA model. Default is 1. """ def __init__( self, adata, n_topics, binarize=False, smart_code="lfu", n_passes=1, n_workers=1, ): self.cells = adata.obs_names.to_list() self.regions_dict = corpora.dictionary.Dictionary([adata.var_names.to_list()]) mtx = adata.X.copy().transpose() if binarize: mtx = binarize(mtx, copy=True) self.corpus = matutils.Sparse2Corpus(mtx) self.shape = adata.shape self.n_topics = n_topics self.smart_code = smart_code self.n_passes = n_passes self.n_workers = n_workers self.lsi_model = None self.lda_model = None self.cell_topic_dist = None self.topic_region_dist = None
[docs] def runLSA(self): r""" Computes LSA for a given matrix and updates the ``TOPICMODEL`` object. """ # LSA tfidf = models.TfidfModel(self.corpus, id2word=self.regions_dict, normalize=True, smartirs=self.smart_code) self.corpus_tfidf = tfidf[self.corpus] self.lsi_model = models.LsiModel(self.corpus_tfidf, id2word=self.regions_dict, num_topics=self.n_topics) self.cell_topic_dist = self.lsi_model[ self.corpus_tfidf ] # lsi[X] computes U^-1*X, which equals V*S (its shape is num_docs * num_topics). # Compute Coherence Score coherence_model_lsa = models.CoherenceModel( model=self.lsi_model, corpus=self.corpus, dictionary=self.regions_dict, coherence="u_mass" ) coherence_lsa = coherence_model_lsa.get_coherence() print("\nCoherence Score: ", coherence_lsa)
[docs] def runLDA(self): r""" Computes LDA model for a given matrix and updates the ``TOPICMODEL`` object. """ self.lda_model = models.LdaMulticore( corpus=self.corpus, num_topics=self.n_topics, passes=self.n_passes, workers=self.n_workers ) # get topic distributions for each document self.cell_topic_dist = self.lda_model[self.corpus] # get topic-word distributions self.topic_region_dist = self.lda_model.get_topics()
[docs] def get_cell_topic(self, pop_sparse_cells=False): r""" Get cell-topic matrix from the ``TOPICMODEL`` object. Returns ------- cell_topic : pandas dataframe Cell-topic matrix """ cells = copy.deepcopy(self.cells) ## make cell-topic df li = [[tup[0] for tup in x] for x in self.cell_topic_dist] li_val = [[tup[1] for tup in x] for x in self.cell_topic_dist] # if all documents don't have same set of topics, (optionally) remove them if len(set([len(x) for x in li_val])) > 1: bad_idx = sorted([i for i, v in enumerate(li_val) if len(v) != self.n_topics], reverse=True) print(f"{len(bad_idx)} cells were detected which don't contribute to all {self.n_topics} topics.") if pop_sparse_cells: print("Removing these cells from the analysis") for x in bad_idx: li_val.pop(x) li.pop(x) cells.pop(x) li_val = np.stack(li_val) cell_topic = pd.DataFrame(li_val, columns=[f"topic_{x}" for x in range(self.n_topics)]) else: cell_topic = np.zeros((len(li_val), self.n_topics)) for i, v in enumerate(li_val): for j, val in enumerate(v): print(f"Index [{i}, {li[i][j]}] = {val}") cell_topic[i, li[i][j]] = val cell_topic = pd.DataFrame(cell_topic, columns=[f"topic_{x}" for x in range(self.n_topics)]) else: li_val = np.stack(li_val) cell_topic = pd.DataFrame(li_val, columns=[f"topic_{x}" for x in range(self.n_topics)]) cell_topic.index = cells return cell_topic