Source code for sincei.TopicModels

# topic models
import numpy as np
import pandas as pd
from gensim import corpora, matutils, models
import copy

# Louvain clustering and UMAP
from networkx import convert_matrix
from sklearn.metrics import pairwise_distances
import leidenalg as la

import umap
from scanpy._utils import get_igraph_from_adjacency
from scanpy.neighbors import (
    _compute_connectivities_umap,
    _get_indices_distances_from_dense_matrix,
)

### ------ Functions ------


[docs]class TOPICMODEL: r""" Computes LSA or LDA for a given matrix and returns the cell-topic matrix Parameters ---------- mat : scipy.sparse.csr_matrix Sparse matrix of shape (cells, regions) cells : list List of Cell IDs (corresponding to the input matrix rows) regions : list List of Regions (corresponding to the input matrix columns) n_topics : int Number of Topics / Principal Components smart_code : str SMART (System for the Mechanical Analysis and Retrieval of Text) code for weighting of input matrix for TFIDF. Only valid for the LSA model. The default ("lfu") corresponds to "log"TF * IDF, and "pivoted unique" normalization of document length. For more information, see: https://en.wikipedia.org/wiki/SMART_Information_Retrieval_System Returns ------- An object of class TOPICMODELS. """ def __init__( self, mat, cells, regions, n_topics, smart_code="lfu", n_passes=1, n_workers=1, ): self.n_topics = n_topics self.smart_code = smart_code self.cells = cells self.regions_dict = corpora.dictionary.Dictionary([regions]) self.corpus = matutils.Sparse2Corpus(mat) self.n_passes = n_passes self.n_workers = n_workers self.lsi_model = None self.lda_model = None self.cell_topic_dist = None self.topic_region_dist = None self.shape = (len(cells), len(regions))
[docs] def runLSA(self): r""" Computes LSA for a given matrix and returns the updated object Returns ------- corpus_tfidf : gensim corpus TFIDF normalized corpus corpus_lsi : gensim corpus LSA corpus """ # LSA tfidf = models.TfidfModel(self.corpus, id2word=self.regions_dict, normalize=True, smartirs=self.smart_code) self.corpus_tfidf = tfidf[self.corpus] self.lsi_model = models.LsiModel(self.corpus_tfidf, id2word=self.regions_dict, num_topics=self.n_topics) self.cell_topic_dist = self.lsi_model[ self.corpus_tfidf ] # lsi[X] computes U^-1*X, which equals V*S (its shape is num_docs * num_topics). # Compute Coherence Score coherence_model_lsa = models.CoherenceModel( model=self.lsi_model, corpus=self.corpus, dictionary=self.regions_dict, coherence="u_mass" ) coherence_lsa = coherence_model_lsa.get_coherence() print("\nCoherence Score: ", coherence_lsa)
[docs] def runLDA(self): r""" Computes LDA model for a given matrix and returns the updated object Returns ------- cell_topic : pandas dataframe Cell-topic matrix """ self.lda_model = models.LdaMulticore( corpus=self.corpus, num_topics=self.n_topics, passes=self.n_passes, workers=self.n_workers ) # get topic distributions for each document self.cell_topic_dist = self.lda_model[self.corpus] # get topic-word distributions self.topic_region_dist = self.lda_model.get_topics()
[docs] def get_cell_topic(self, pop_sparse_cells=False): r""" Get cell-topic matrix from the updated object Returns ------- cell_topic : pandas dataframe Cell-topic matrix """ cells = copy.deepcopy(self.cells) ## make cell-topic df li = [[tup[0] for tup in x] for x in self.cell_topic_dist] li_val = [[tup[1] for tup in x] for x in self.cell_topic_dist] # if all documents don't have same set of topics, (optionally) remove them if len(set([len(x) for x in li_val])) > 1: bad_idx = sorted([i for i, v in enumerate(li_val) if len(v) != self.n_topics], reverse=True) print("{} Cells were detected which don't contribute to all {} topics.".format(len(bad_idx), self.n_topics)) if pop_sparse_cells: print("Removing these cells from the analysis") for x in bad_idx: li_val.pop(x) li.pop(x) cells.pop(x) else: print("Not implemented! Need to fill these entries with zeros") li_val = np.stack(li_val) cell_topic = pd.DataFrame(li_val, columns=li[0]) cell_topic.index = cells return cell_topic