Source code for unipy_nlp.analyze.topic_modeling

# -*- coding: utf-8 -*-
"""Topic Modeling(LDA) & Word2Vec.
"""

import os
import re
import sys
import json
import zipfile
import urllib
import random
import warnings
import subprocess
import itertools as it
import functools as ft
import collections
from glob import glob
from pprint import pprint
import numpy as np
import pandas as pd

import gensim
import pyLDAvis
import pyLDAvis.gensim as gensimvis

import unidecode
from unicodedata import normalize

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm


__all__ = []
__all__ += [
    # 'compute_coherence_values',
    # 'pick_best_topics',
    # 'groupby_top_n',
    # 'get_terminfo_table',
    'TopicModeler',
]

def compute_coherence_values(
        dictionary,
        corpus,
        id2word,
        texts,
        num_topic_list=[5, 10],
        lda_type='default',  # {'default', 'mallet'}
        workers_n=2,
        random_seed=1,
        ):
    """
    Compute c_v coherence for various number of topics

    Parameters:
    ----------
    dictionary : Gensim dictionary
    corpus : Gensim corpus
    texts : List of input texts
    limit : Max num of topics

    Returns:
    -------
    model_list : List of LDA topic models
    coherence_value_list : Coherence values corresponding to the LDA model,
        with respective number of topics
    """
    model_list = []
    coherence_list = []

    if random_seed:
        random.seed(random_seed)
        np.random.seed(random_seed)
    if lda_type == 'default':
        for num_topics in num_topic_list:
            model = gensim.models.LdaMulticore(
                corpus,
                num_topics=num_topics,
                id2word=id2word,
                passes=2,
                workers=workers_n,
                eta='symmetric',
                decay=.8,  # {.5, 1.}
                per_word_topics=False,
                offset=1.,
                iterations=30,
                gamma_threshold=.001,  # 0.001,
                minimum_probability=.05,  # .01,
                minimum_phi_value=.01,
                random_state=random_seed,
            )
            coherence_model = gensim.models.CoherenceModel(
                model=model,
                texts=texts,
                dictionary=id2word,
                coherence='c_v',
            )

            model_list += [model]
            coherence_list += [coherence_model.get_coherence()]

    elif lda_type == 'hdp':
        for num_topics in num_topic_list:
            model = gensim.models.HdpModel(
                corpus,
                id2word=id2word,
                T=3,
                # alpha=,
                K=num_topics,
                # gamma=,
                # decay=.5, # {.5, 1.}
                # per_word_topics=True,
                # minimum_probability=.1,
                # minimum_phi_value=.01,
                random_state=random_seed,
            )
            coherence_model = gensim.models.CoherenceModel(
                model=model,
                texts=texts,
                dictionary=id2word,
                coherence='c_v',
            )

            model_list += [model]
            coherence_list += [coherence_model.get_coherence()]

    elif lda_type == 'mallet':
        # Download File: http://mallet.cs.umass.edu/dist/mallet-2.0.8.zip
        mallet_url = 'http://mallet.cs.umass.edu/dist/mallet-2.0.8.zip'
        mallet_filename = mallet_url.split('/')[-1]
        mallet_unzipped_dirname = mallet_filename.split('.zip')[0]
        mallet_path = f'{mallet_unzipped_dirname}/bin/mallet'


        if not os.path.exists(mallet_path):
            # download the url contents in binary format
            urllib.request.urlretrieve(mallet_url, mallet_filename)

            # open method to open a file on your system and write the contents
            with zipfile.ZipFile(mallet_filename, "r") as zip_ref:
                zip_ref.extractall(mallet_unzipped_dirname)

        for num_topics in num_topic_list:
            model = gensim.models.wrappers.LdaMallet(
                mallet_path,
                corpus=corpus,
                num_topics=num_topics,
                id2word=id2word,
            )
            coherence_model = gensim.models.CoherenceModel(
                model=model,
                texts=texts,
                dictionary=id2word,
                coherence='c_v',
            )

            model_list += [model]
            # coherence_list += [coherence_model.get_coherence()]

    return model_list, coherence_list


# Print the coherence scores
def pick_best_topics(
        dictionary,
        corpus,
        texts,
        num_topic_list=[5, 7, 10, 12, 15, 17, 20],
        lda_type='default',
        workers_n=2,
        random_seed=1,
        ):
    model_list, coherence_value_list = compute_coherence_values(
        dictionary=dictionary,
        corpus=corpus,
        id2word=dictionary,
        texts=texts,
        num_topic_list=num_topic_list,
        lda_type=lda_type,
        workers_n=workers_n,
        random_seed=random_seed,
        #  start=2, limit=40, step=6,
    )

    paired = zip(model_list, coherence_value_list)
    ordered = sorted(paired, key=lambda x: x[1], reverse=True)
    best_model = ordered[0][0]

    model_coh_list = []
    model_topicnum_list = []
    for i, (m, cv) in enumerate(zip(model_list, coherence_value_list)):
        topic_num = m.num_topics
        coh_value = round(cv, 4)
        print(
            f'[{i}] Num Topics ({topic_num:2})' +
            f' has Coherence Value of {coh_value}'
        )
        # model_topicnum_list += [(topic_num, m)]
        model_coh_list += [(topic_num, m, coh_value)]
        model_topicnum_list += [
            (topic_num, {'model': m, 'coherence': coh_value})
        ]

    model_dict = dict(model_topicnum_list)
    print(f'Best N topics: {best_model.num_topics}')

    return best_model, model_coh_list, model_dict, coherence_value_list


def get_saliency(tinfo_df):
    r"""Calculate Saliency for terms within a topic.

    .. math::
        distinctiveness(w) = \sum P(t \vert w) log\frac{P(t \vert w)}{P(w)}
        saliency(w) = P(w) \times distinctiveness(w)
    <div align="right">(Chuang, J., 2012. Termite: Visualization techniques for assessing textual topic models)</div>

    Parameters
    ----------
    tinfo: pandas.DataFrame
        `pyLDAvis.gensim.prepare`.to_dict()['tinfo'] containing
        ['Category', 'Freq', 'Term', 'Total', 'loglift', 'logprob']

    Return
    ------
    saliency: float

    """

    saliency = tinfo_df['Freq'] / tinfo_df['Total']

    return saliency


def get_relevance(tinfo_df, lambda_val=.6):
    r"""Calculate Relevances with a given lambda value.

    .. math::
        relevance(t,w) = \lambda \cdot P(w \vert t) + (1 - \lambda) \cdot \frac{P(w \vert t)}{P(w)}
        Recommended \lambda = 0.6
    <div align="right">(Sievert, C., 2014. LDAvis: A method for visualizing and interpreting topics)</div>

    Parameters
    ----------
    tinfo: pandas.DataFrame
        `pyLDAvis.gensim.prepare`.to_dict()['tinfo'] containing
        ['Category', 'Freq', 'Term', 'Total', 'loglift', 'logprob']

    lambda_val: float
        lambda_ratio between {0-1}. default is .6 (recommended from its paper)

    Return
    ------
    relevance: float

    """

    relevance = l * tinfo_df['logprob'] + (1 - l) * tinfo_df['loglift']

    return relevance


def groupby_top_n(
        dataframe,
        group_by=None,
        order_by=None,
        ascending=False,
        n=5,
        ):

    res_df = (
        dataframe
        .groupby(group_by)
        [dataframe.columns.drop(group_by)]
        .apply(
            lambda x: x.sort_values(order_by, ascending=ascending).head(n)
        )
    )
    return res_df


def _df_with_names(data, index_name, columns_name):
    """A renaming function from `pyLDAvis._prepare`.
    """
    if type(data) == pd.DataFrame:
        # we want our index to be numbered
        df = pd.DataFrame(data.values)
    else:
        df = pd.DataFrame(data)
    df.index.name = index_name
    df.columns.name = columns_name
    return df


def _series_with_name(data, name):
    """A renaming function from `pyLDAvis._prepare`.
    """
    if type(data) == pd.Series:
        data.name = name
        # ensures a numeric index
        return data.reset_index()[name]
    else:
        return pd.Series(data, name=name)


def get_terminfo_table(
        lda_model,
        corpus: list = None,
        dictionary: gensim.corpora.dictionary.Dictionary = None,
        doc_topic_dists=None,
        use_gensim_prepared=True,
        top_n=10,
        workers_n=-1,
        r_normalized=False,
        relevence_lambda_val=.6,
        random_seed=1,
        ):

    if random_seed:
        random.seed(random_seed)
        np.random.seed(random_seed)

    if use_gensim_prepared:
        _prepared = gensimvis.prepare(
            topic_model=lda_model,
            corpus=corpus,
            dictionary=dictionary,
            doc_topic_dist=None,
            R=len(dictionary),
            # lambda_step=0.2,
            mds='tsne',
            # mds=<function js_PCoA>,
            n_jobs=workers_n,
            plot_opts={'xlab': 'PC1', 'ylab': 'PC2'},
            sort_topics=True,
        )
        tinfo_df = pd.DataFrame(_prepared.to_dict()['tinfo'])

        tinfo_df['topic_term_dists'] = np.exp(tinfo_df['logprob'])
        tinfo_df['term_proportion'] = (
            np.exp(tinfo_df['logprob']) / np.exp(tinfo_df['loglift'])
        )
        tinfo_df['saliency'] = get_saliency(tinfo_df)
        tinfo_df['relevance'] = get_relevance(
            tinfo_df,
            lambda_val=relevence_lambda_val,
        )

        tinfo_df['term_prob'] = np.exp(tinfo_df['logprob'])
        tinfo_df['term_r_prob'] = np.exp(tinfo_df['relevance'])
        tinfo_df['term_r_adj_prob'] = (
            tinfo_df
            .groupby(['Category'])
            ['term_r_prob']
            .apply(lambda x: x / x.sum())
        )

        if r_normalized:
            r_colname = 'term_r_adj_prob'
        else:
            r_colname = 'term_r_prob'

        relevance_score_df = (
            tinfo_df[tinfo_df['Category'] != 'Default']
            .groupby(['Category', 'Term'])
            [[r_colname]]
            .sum()
            .reset_index()
        )

        # corpus_dict_df = pd.DataFrame(
        #     # It is possible
        #     # because the keys of this dictionary generated from range(int).
        #     # Usually the dictionary is iterable but not ordered.
        #     list(dictionary.values()),
        #     # [dictionary[i] for i, _ in enumerate(dictionary)],
        #     columns=['Term'],
        # )
        # corpus_dict_df['term_id'] = corpus_dict_df.index
        corpus_dict_df = pd.DataFrame(
            list(dictionary.items()),
            columns=['term_id', 'Term'],
        )
        corpus_dict_df.set_index('term_id', drop=False, inplace=True)

        r_score_df = pd.merge(
            relevance_score_df,
            corpus_dict_df,
            on=['Term'],
            how='left',
        )
        r_score_df['category_num'] = (
            r_score_df['Category']
            .str
            .replace('Topic', '')
            .astype(int) - 1
        ).astype('category')
        r_score_df.set_index(['category_num', 'term_id'], inplace=True)
        ixs = pd.IndexSlice

        topic_list = r_score_df.index.levels[0]
        equal_prob = 1. / len(topic_list)
        empty_bow_case_list = list(
            zip(topic_list, [equal_prob] * len(topic_list))
        )

        def get_bow_score(
                bow_chunk,
                score_df=r_score_df,
                colname=r_colname,
                ):

            bow_chunk_arr = np.array(bow_chunk)
            word_id_arr = bow_chunk_arr[:, 0]
            word_cnt_arr = bow_chunk_arr[:, 1]

            # normed_word_cnt_arr = (word_cnt_arr / word_cnt_arr.sum()) * 10
            clipped_word_cnt_arr = np.clip(word_cnt_arr, 0, 3)

            score_series = (score_df.loc[ixs[:, word_id_arr], :]
                            .groupby(level=0)
                            [colname]
                            .apply(lambda x: x @ clipped_word_cnt_arr)
                            )
            score_list = list(score_series.iteritems())
            # normed_score_series = score_series / score_series.sum()
            # score_list = list(normed_score_series.iteritems())

            return score_list

        bow_score_list = [
            get_bow_score(bow_chunk)
            if bow_chunk not in (None, [])
            else empty_bow_case_list
            for bow_chunk in corpus
        ]

        relevant_terms_df = groupby_top_n(
            tinfo_df,
            group_by=['Category'],
            order_by=['relevance'],
            ascending=False,
            n=top_n,
        )
        relevant_terms_df['rank'] = (
            relevant_terms_df
            .groupby(['Category'])
            ['relevance']
            # .rank(method='max')
            .rank(method='max', ascending=False)
            .astype(int)
        )

    else:
        vis_attr_dict = gensimvis._extract_data(
            topic_model=lda_model,
            corpus=corpus,
            dictionary=dictionary,
            doc_topic_dists=None,
        )
        topic_term_dists = _df_with_names(
            vis_attr_dict['topic_term_dists'],
            'topic', 'term',
        )
        doc_topic_dists = _df_with_names(
            vis_attr_dict['doc_topic_dists'],
            'doc', 'topic',
        )
        term_frequency = _series_with_name(
            vis_attr_dict['term_frequency'],
            'term_frequency',
        )
        doc_lengths = _series_with_name(
            vis_attr_dict['doc_lengths'],
            'doc_length',
        )
        vocab = _series_with_name(
            vis_attr_dict['vocab'],
            'vocab',
        )

        ## Topic
        # doc_lengths @ doc_topic_dists
        topic_freq = (doc_topic_dists.T * doc_lengths).T.sum()
        topic_proportion = (topic_freq / topic_freq.sum())

        ## reorder all data based on new ordering of topics
        # topic_proportion = (topic_freq / topic_freq.sum()).sort_values(ascending=False)
        # topic_order = topic_proportion.index
        # topic_freq = topic_freq[topic_order]
        # topic_term_dists = topic_term_dists.iloc[topic_order]
        # doc_topic_dists = doc_topic_dists[topic_order]

        # token counts for each term-topic combination
        term_topic_freq = (topic_term_dists.T * topic_freq).T
        term_frequency = np.sum(term_topic_freq, axis=0)

        ## Term
        term_proportion = term_frequency / term_frequency.sum()

        # compute the distinctiveness and saliency of the terms
        topic_given_term = topic_term_dists / topic_term_dists.sum()
        kernel = (topic_given_term *
                  np.log((topic_given_term.T / topic_proportion).T))
        distinctiveness = kernel.sum()
        saliency = term_proportion * distinctiveness

        default_tinfo_df = pd.DataFrame(
            {
                'saliency': saliency,
                'term': vocab,
                'freq': term_frequency,
                'total': term_frequency,
                'category': 'default',
                'logprob': np.arange(len(vocab), 0, -1),
                'loglift': np.arange(len(vocab), 0, -1),
            }
        )

        log_lift = np.log(topic_term_dists / term_proportion)
        log_prob = log_ttd = np.log(topic_term_dists)

    return _prepared, tinfo_df, relevant_terms_df, r_score_df, bow_score_list


[docs]class TopicModeler(object): """Topic Modeling via LDA(Latent Diriclet Allocation). Get tokenized from text. Parameters ---------- sentence_list: list A list of raw sentences. tokenized_sentence_list: list A nested list of tokenized sentences. Attributes ---------- After `__init__`: self.sentences: A list of raw sentences. self.tokenized: list A nested list of tokenized sentences. self.corpora_dict: `gensim.corpora.dictionary.Dictionary` A token dictionary from a given text. self.bow_corpus_idx: list A nested list, which contains converted documents into a list of token indices. self.bow_corpus_doc: list A nested list, which contains converted documents into a list of token words. After `train_lda` or `load_lda`: self.best_lda_model: dict A dict contains the best model & its coherence value. `{'coherence': int, 'model':gensim.models.ldamulticore.LdaMulticore}` self.lda_model_list = model_coh_list A nested list of `[topic_num, model, coherence_value]` self.lda_model_dict: A nested dict as `{topic_num: {'coherence': int, 'model': `gensim.models.ldamulticore.LdaMulticore`}}` self.trained: bool `True` If trained or properly loaded. After `visualize_lda_to_html`: self.selected_topic_num: int A int of selected topic number. self.selected_model: `gensim.models.ldamulticore.LdaMulticore` self.vis_prepared: `pyLDAvis.prepared_data.PreparedData` self.total_terms_df `tinfo_table`, `'Default'` removed. self.top_relevant_terms_df: `pandas.DataFrame` A rank table of `Category`. self.r_adj_score_df: `pandas.DataFrame` A tinfo table, considering saliency and relevence score. self.bow_score_list: list Scores of each sentence, based on bow_corpus, clipped by (0, 3). After `estimate_topics_by_documents` or `load_estimated`: self.dominant_topic_estimation_df: `pandas.DataFrame` A dataframe contains `['lda_prob', 'dominant_topic', 'contribution', 'topic_keywords']` self.topic_freq_df: `pandas.DataFrame` A rank table by topic frequency. After `get_representitive_documents` or `load_representitive_documents`: self.representitive_docs: `pandas.DataFrame` After `get_representitive_candidates`: return `repr_sentences, repr_bow_corpus_doc, repr_bow_corpus_idx` Methods ------- train_lda save_lda load_lda pick_best_lda_topics visualize_lda_to_html estimate_topics_by_documents get_representitive_documents See Also -------- Preprocessing ``unipy_nlp.preprocessing.Preprocessor`` POS-Tagging ``konlpy.tag.Mecab`` Byte-Pair Encoding ``sentencepiece`` Examples -------- >>> import unipy_nlp.data_collector as udcl >>> import unipy_nlp.preprocessing as uprc >>> import unipy_nlp.analyze.topic_modeling as utpm >>> from pprint import pprint >>> prep = uprc.Preprocessor() >>> prep.read_json('./data/_tmp_dump/prep/rawdata_collected.json') >>> sentence_for_pos_list = [ ... "무궁화 꽃이 피었습니다." ... "우리는 민족중흥의 역사적 사명을 띠고 이 땅에 태어났다.", ... ] >>> tokenized = prep.pos_tag( ... input_text=sentence_for_pos_list, ... tag_type=[ ... '체언 접두사', '명사', '한자', '외국어', ... '수사', '구분자', ... '동사', ... '부정 지정사', '긍정 지정사', ... ] ... ) >>> print(tokenized) [['무궁화'], ['우리', '민족중흥', '역사', '사명']] >>> tpm = utpm.TopicModeler(sentence_list, tokenized) >>> tpm.train_lda( ... num_topic=5, ... workers_n=8, ... random_seed=1, ... ) >>> tpm.save_lda(savepath='data/_tmp_dump/topic_modeling', affix='lda') >>> tpm.load_lda('data/_tmp_dump/topic_modeling') >>> tpm.pick_best_lda_topics( ... num_topic_list=[5, 7, 10], ... workers_n=8, ... random_seed=1, ... ) >>> tpm.visualize_lda_to_html( ... 7, ... top_n=10, ... r_normalized=False, ... relevence_lambda_val=.6, ... workers_n=8, ... random_seed=1, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... # save_type='html', # {'html', 'json'} ... save_relevent_terms_ok=True, ... save_html_ok=True, ... display_ok=False, ... ) >>> sentence_labeled = tpm.estimate_topics_by_documents( ... 7, ... # sentence_list=tokenized, ... random_seed=1, ... save_ok=True, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... ) >>> sentence_repr = tpm.get_representitive_documents( ... 7, ... len_range=(10, 30), ... top_n=10, ... save_ok=True, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... ) """ def __init__( self, sentence_list, tokenized_sentence_list, ): self.trained = False # {'trained', 'loaded'} self.selected_model = None self.lda_model_list = None self.sentences = sentence_list self.tokenized = tokenized_sentence_list self.corpora_dict = gensim.corpora.Dictionary( tokenized_sentence_list ) self.corpora_dict.filter_extremes( no_below=30, no_above=.5, keep_n=100000, ) self.bow_corpus_idx = [ self.corpora_dict.doc2idx(doc) for doc in tokenized_sentence_list ] self.bow_corpus_doc = [ self.corpora_dict.doc2bow(doc) for doc in tokenized_sentence_list ]
[docs] def train_lda( self, num_topic=5, lda_type='default', workers_n=2, random_seed=1, ): """ Train a single LDA Topic Model. Parameters ---------- num_topics: int (default: 5) A number of topics. lda_type: str (default: `'default'`, `{'default', 'hdp', 'mallet'}`) A type of LDA model. Use `'default'` for now. Other options are working in progress. workers_n: int (default: 2) A number of CPU core to train. random_seed: int (default: 1) A random seed int. Example ------- >>> import unipy_nlp.data_collector as udcl >>> import unipy_nlp.preprocessing as uprc >>> import unipy_nlp.analyze.topic_modeling as utpm >>> from pprint import pprint >>> prep = uprc.Preprocessor() >>> prep.read_json('./data/_tmp_dump/prep/rawdata_collected.json') >>> sentence_for_pos_list = [ ... "무궁화 꽃이 피었습니다." ... "우리는 민족중흥의 역사적 사명을 띠고 이 땅에 태어났다.", ... ] >>> tokenized = prep.pos_tag( ... input_text=sentence_for_pos_list, ... tag_type=[ ... '체언 접두사', '명사', '한자', '외국어', ... '수사', '구분자', ... '동사', ... '부정 지정사', '긍정 지정사', ... ] ... ) >>> tpm = utpm.TopicModeler(sentence_list, tokenized) >>> tpm.train_lda( ... num_topic=5, ... workers_n=8, ... random_seed=1, ... ) """ self.pick_best_lda_topics( num_topic_list=[num_topic], lda_type=lda_type, workers_n=workers_n, random_seed=random_seed, ) self.trained = True
[docs] def pick_best_lda_topics( self, num_topic_list=[5, 7, 10, 12, 15, 17, 20], lda_type='default', workers_n=2, random_seed=1, ): """ Train multiple LDA Topic Models by given topic numbers. Parameters ---------- num_topic_list: list (default: `[5, 7, 10, 12, 15, 17, 20]`) A list of topic numbers. lda_type: str (default: `'default'`, `{'default', 'hdp', 'mallet'}`) A type of LDA model. Use `'default'` for now. Other options are working in progress. workers_n: int (default: 2) A number of CPU core to train. random_seed: int (default: 1) A random seed int. Example ------- >>> import unipy_nlp.analyze.topic_modeling as utpm >>> tpm = utpm.TopicModeler(sentence_list, tokenized) >>> tpm.pick_best_lda_topics( ... num_topic=5, ... workers_n=8, ... random_seed=1, ... ) """ (best_lda_model, model_coh_list, model_dict, coh_list) = pick_best_topics( dictionary=self.corpora_dict, corpus=self.bow_corpus_doc, texts=self.tokenized, num_topic_list=num_topic_list, lda_type=lda_type, workers_n=workers_n, random_seed=random_seed, ) self.best_lda_model = best_lda_model self.lda_model_list = model_coh_list self.lda_model_dict = model_dict self.trained = True
[docs] def save_lda(self, savepath='./', affix='lda'): """ Save trained lda model(s). Parameters ---------- savepath: str (default: `'./'`) A dirpath to save. affix: str (default: `'lda'`) An affix of filename. Its ext will be `.ldamodel`. Example ------- >>> import unipy_nlp.analyze.topic_modeling as utpm >>> tpm = utpm.TopicModeler(sentence_list, tokenized) >>> tpm.pick_best_lda_topics( ... num_topic=5, ... workers_n=8, ... random_seed=1, ... ) >>> tpm.save_lda(savepath='data/_tmp_dump/topic_modeling', affix='lda') """ os.makedirs(savepath, exist_ok=True) corpora_filename = os.path.join( savepath, f'{affix}.cdict', ) self.corpora_dict.save_as_text( corpora_filename, sort_by_word=False, ) for _topic_num, _inner_dict in self.lda_model_dict.items(): _model = _inner_dict['model'] _coh_val = _inner_dict['coherence'] model_name_str = '_'.join([ f'{affix}', f'topics-{_topic_num}', f'coh-{_coh_val}.ldamodel', ]) print(f'{savepath:2}: {model_name_str}') _filename = os.path.join( savepath, model_name_str, ) _model.save(_filename)
[docs] def load_lda(self, filepath): """ Load trained lda model(s). Parameters ---------- filepath: str A dirpath to load. It contains `.ldamodel`. Example ------- >>> import unipy_nlp.analyze.topic_modeling as utpm >>> tpm = utpm.TopicModeler(sentence_list, tokenized) >>> tpm.load_lda('data/_tmp_dump/topic_modeling') """ if os.path.isfile(filepath): self.lda_model_list = [] self.lda_model_dict = {} path_str = '/'.join(filepath.split('/')[:-1]) model_name_str = filepath.split('/')[-1] affix, _topic_num, _coh_val = re.findall( r'^(.+)_topics\-(\d+)_coh\-([-+]?\d*\.\d+|\d+)\.ldamodel', model_name_str, )[0] _topic_num, _coh_val = int(_topic_num), int(_coh_val) _model = gensim.models.LdaMulticore.load(filepath) self.lda_model_list = [(_topic_num, _model, _coh_val)] self.lda_model_dict.__setitem__( _topic_num, {'model': _model, 'coherence': _coh_val}, ) corpora_filename = os.path.join( path_str, f'{affix}.cdict', ) self.corpora_dict = gensim.corpora.Dictionary.load_from_text( corpora_filename, ) self.best_lda_model = self.lda_model_dict[_topic_num] print(f'Model loaded: topics={_topic_num}, coh={_coh_val}') elif os.path.isdir(filepath): self.lda_model_list = [] self.lda_model_dict = {} path_str = filepath filepath_list = glob(os.path.join(filepath, '*.ldamodel')) for filename in filepath_list: model_name_str = filename.split('/')[-1] affix, _topic_num, _coh_val = re.findall( r'^(.+)_topics\-(\d+)_coh\-([-+]?\d*\.\d+|\d+).ldamodel', model_name_str, )[0] _topic_num, _coh_val = int(_topic_num), int(_coh_val) _model = gensim.models.LdaMulticore.load(filename) self.lda_model_list += [(_topic_num, _model, _coh_val)] self.lda_model_dict.__setitem__( _topic_num, {'model': _model, 'coherence': _coh_val}, ) print(f'Model loaded: topics={_topic_num}, coh={_coh_val}') corpora_filename = os.path.join( path_str, f'{affix}.cdict', ) self.corpora_dict = gensim.corpora.Dictionary.load_from_text( corpora_filename, ) best_topic_num, best_model, best_coh_val = max( self.lda_model_list, key=lambda x: x[-1], ) self.best_lda_model = self.lda_model_dict[int(best_topic_num)] self.trained = True
def _get_terminfo_table(self, *args, **kwargs): return get_terminfo_table(*args, **kwargs)
[docs] def visualize_lda_to_html( self, target_topic_num, top_n=10, r_normalized=False, relevence_lambda_val=.6, workers_n=2, random_seed=1, savepath='./', filename_affix='lda', # save_type='html', # {'html', 'json'} save_relevent_terms_ok=True, save_html_ok=True, display_ok=False, ): """ Run `pyLDAvis.prepare` & get adjusted scores(use saliency & relevence) of terms by each topic. Parameters ---------- target_topic_num: int A topic number of LDA model to visualize. top_n: int (default: `10`) A number of the most relevent terms in a topic. r_normalized: bool (default: `False`) Use normalized probabilities when it is `True`. (not recommended in most cases.) relevence_lambda_val: float (defautl: `.6`). A lambda value(ratio) to calculate relevence. workers_n: int (default: `2`) A number of CPU cores to calculate(`pyLDAvis.prepare`) random_seed: int (default: `1`) A random seed number. savepath: str (default: `'./'`) A dirpath to save `pyLDAvis` or other `pandas.DataFrame`s. filename_affix: str (default: `'lda'`) An affix of filename to save `pyLDAvis` html or json. save_relevent_terms_ok: bool (default: `True`) An option to save `pandas.DataFrame` of `top_relevent_terms`. save_html_ok: bool (default: `True`) An option to save html. display_ok: bool (default: `False`) Call `pyLDAvis.display` when it is `True`. References ---------- Saliency: `Chuang, J., 2012. Termite: Visualization techniques for assessing textual topic models` Relevence: `Sievert, C., 2014. LDAvis: A method for visualizing and interpreting topics` Example ------- >>> import unipy_nlp.analyze.topic_modeling as utpm >>> tpm = utpm.TopicModeler(sentence_list, tokenized) >>> tpm.pick_best_lda_topics( ... num_topic=5, ... workers_n=8, ... random_seed=1, ... ) >>> tpm.visualize_lda_to_html( ... 7, ... top_n=10, ... r_normalized=False, ... relevence_lambda_val=.6, ... workers_n=8, ... random_seed=1, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... save_relevent_terms_ok=True, ... save_html_ok=True, ... display_ok=False, ... ) """ if target_topic_num in self.lda_model_dict.keys(): self.selected_topic_num = target_topic_num self.selected_model = ( self.lda_model_dict[target_topic_num]['model'] ) else: raise KeyError("Model doesn't exist. Select a proper number.") (vis_prepared, total_terms_df, top_relevant_terms_df, r_adj_score_df, bow_score_list) = self._get_terminfo_table( self.selected_model, corpus=self.bow_corpus_doc, dictionary=self.corpora_dict, doc_topic_dists=None, use_gensim_prepared=True, top_n=top_n, r_normalized=r_normalized, relevence_lambda_val=relevence_lambda_val, workers_n=workers_n, random_seed=random_seed, ) self.vis_prepared = vis_prepared self.total_terms_df = total_terms_df self.top_relevant_terms_df = top_relevant_terms_df self.r_adj_score_df = r_adj_score_df self.bow_score_list = bow_score_list if save_html_ok: os.makedirs(savepath, exist_ok=True) ldavis_filename_html_str = os.path.join( savepath, f'{filename_affix}_topics-{target_topic_num}.html', ) pyLDAvis.save_html( self.vis_prepared, ldavis_filename_html_str, ) print(f"LDAVIS HTML Saved: '{ldavis_filename_html_str}'") if save_relevent_terms_ok: os.makedirs(savepath, exist_ok=True) ldavis_filename_rdf_str = os.path.join( savepath, '_'.join([ f'{filename_affix}', f'topics-{target_topic_num}', f'top{top_n}_relevent_terms_df.csv', ]), ) self.top_relevant_terms_df.to_csv( ldavis_filename_rdf_str, index=True, header=True, encoding='utf-8', ) print(f"LDAVIS DF Saved: '{ldavis_filename_rdf_str}'") if display_ok: pyLDAvis.display(self.vis_prepared, local=False)
[docs] def estimate_topics_by_documents( self, target_topic_num, # sentence_list=None, random_seed=1, save_ok=True, savepath='./', filename_affix='lda', ): """ Get dominant topics & its contribution scores from each documents. Parameters ---------- target_topic_num: int A topic number of LDA model to use. random_seed: int (default: `1`) A random seed number. save_ok: bool (default: `True`) Save return `pandas.DataFrame`. savepath: str (default: `'./'`) A dirpath to save the topic-labeled sentences. filename_affix: str (default: `'lda'`) An affix of filename to save the topic-labeled sentences. Return ------ dominant_topic_estimation_df: `pandas.DataFrame` Topic-labeled given(trained) sentences. topic_freq_df: `pandas.DataFrame` A rank table of topics by frequency. Example ------- >>> import unipy_nlp.analyze.topic_modeling as utpm >>> tpm = utpm.TopicModeler(sentence_list, tokenized) >>> tpm.pick_best_lda_topics( ... num_topic=5, ... workers_n=8, ... random_seed=1, ... ) >>> sentence_labeled = tpm.estimate_topics_by_documents( ... 7, ... random_seed=1, ... save_ok=True, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... ) """ if target_topic_num != self.selected_topic_num: raise ValueError( 'You should run `visualize_lda_to_html` first.' ) lda_model = self.selected_model corpus = self.bow_corpus_doc docs = self.sentences bow_r_score_list = self.bow_score_list top_r_terms_df = self.r_adj_score_df res_df = pd.DataFrame( columns=[ 'dominant_topic', 'contribution', 'topic_keywords', 'documents', 'lda_prob', ] ) if random_seed: random.seed(random_seed) np.random.seed(random_seed) r_colname = top_r_terms_df.columns.drop(['Category', 'Term'])[0] if top_r_terms_df is not None: top_sorted_words = groupby_top_n( top_r_terms_df.reset_index(), group_by=['category_num'], order_by=[r_colname], ascending=False, n=10, ) top_word_str = ( top_sorted_words .groupby(level=0) ['Term'] .apply(lambda x: ', '.join(x.tolist())) ) def normalize_prob(prob_row): total_prob = sum([prob for topic, prob in prob_row]) normed_prob_row = [ (topic, prob / total_prob) for topic, prob in prob_row ] return normed_prob_row def sort_prob(prob_row, bow_r_score_list=bow_r_score_list): return sorted(prob_row, key=lambda x: x[1], reverse=True) def get_dominant_prob(prob_row): return pd.Series(prob_row[0]) def get_topic_keywords( dom_topic_num, lda_model=lda_model, top_r_terms_df=top_r_terms_df, ): if top_r_terms_df is not None: return top_word_str[int(dom_topic_num)] else: return ', '.join( np.array(lda_model.show_topic(int(dom_topic_num)))[:, 0] ) res_df['documents'] = docs if bow_r_score_list is not None: # bow_score_series = ( # pd.Series(bow_r_score_list).apply(normalize_prob) # ) bow_score_series = pd.Series(bow_r_score_list) else: bow_score_series = pd.Series(lda_model[corpus]) res_df['lda_prob'] = bow_score_series.apply(sort_prob) res_df[['dominant_topic', 'contribution']] = ( res_df['lda_prob'] .apply(get_dominant_prob) ) res_df['dominant_topic'] = ( res_df['dominant_topic'] .astype(int) .astype('category') ) res_df['topic_keywords'] = ( res_df['dominant_topic'] .apply(get_topic_keywords) ) res_df['lda_prob'] = res_df['lda_prob'].apply(dict) res_df.index.name = 'doc_num' res_df.reset_index(inplace=True) self.dominant_topic_estimation_df = res_df self.topic_freq_df = ( self.dominant_topic_estimation_df .groupby('dominant_topic') ['doc_num'] .count() .reset_index() .sort_values('doc_num', ascending=False) ) if save_ok: os.makedirs(savepath, exist_ok=True) filename_str = os.path.join( savepath, '_'.join([ f'{filename_affix}', f'topics-{target_topic_num}', f'dominant_topic_estimation_df.csv', ]), ) res_df.to_csv( filename_str, index=False, header=True, encoding='utf-8', ) return self.dominant_topic_estimation_df, self.topic_freq_df
[docs] def load_estimated( self, target_topic_num, savepath='./', filename_affix='lda' ): """ Load the result of `self.estimate_topics_by_documents`. Parameters ---------- target_topic_num: int A topic number of LDA model to use. savepath: str (default: `'./'`) A dirpath to load the topic-labeled sentences. filename_affix: str (default: `'lda'`) An affix of filename to load the topic-labeled sentences. Return ------ dominant_topic_estimation_df: `pandas.DataFrame` Topic-labeled given(trained) sentences. topic_freq_df: `pandas.DataFrame` A rank table of topics by frequency. Example ------- >>> import unipy_nlp.analyze.topic_modeling as utpm >>> tpm = utpm.TopicModeler(sentence_list, tokenized) >>> tpm.pick_best_lda_topics( ... num_topic=5, ... workers_n=8, ... random_seed=1, ... ) >>> sentence_labeled = tpm.estimate_topics_by_documents( ... 7, ... random_seed=1, ... save_ok=True, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... ) >>> sentence_labeled, topic_freq = tpm.load_estimated( ... target_topic_num=7, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... ) """ filename_str = os.path.join( savepath, '_'.join([ f'{filename_affix}', f'topics-{target_topic_num}', f'dominant_topic_estimation_df.csv', ]), ) res_df = pd.read_csv( filename_str, encoding='utf-8', ) self.dominant_topic_estimation_df = res_df self.topic_freq_df = ( self.dominant_topic_estimation_df .groupby('dominant_topic') ['doc_num'] .count() .reset_index() .sort_values('doc_num', ascending=False) ) return self.dominant_topic_estimation_df, self.topic_freq_df
[docs] def get_best_n_terms(self): pass
[docs] def get_representitive_candidates( self, len_range=(10, 30), ): """ Get representitive candidates by length. It is for to use `unipy_nlp.network_plot`. Parameters ---------- len_range: `list` or `tuple` (default: `(10, 30)`) A candidate threshold by length. Return ------ repr_sentences: `list` A list of sentences. repr_bow_corpus_doc: `list` A nested list, which contains converted documents into a list of token words. repr_bow_corpus_idx: `list` A nested list, which contains converted documents into a list of token indices.. Example ------- >>> import unipy_nlp.analyze.topic_modeling as utpm >>> tpm = utpm.TopicModeler(sentence_list, tokenized) >>> tpm.pick_best_lda_topics( ... num_topic=5, ... workers_n=8, ... random_seed=1, ... ) >>> sentence_labeled = tpm.estimate_topics_by_documents( ... 7, ... random_seed=1, ... save_ok=True, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... ) >>> (repr_sentenced, >>> repr_bow_corpus_doc, >>> repr_bow_corpus_idx) = tpm.get_representitive_candidates( ... len_range=(12, 30), ... ) """ len_min, len_max = len_range bool_mask = mask_to_filter_document_by_len = list( map(lambda x: len_min <= len(x) < len_max, self.sentences) ) repr_bow_corpus_idx = list( it.compress(self.bow_corpus_idx, bool_mask) ) repr_bow_corpus_doc = list( it.compress(self.bow_corpus_doc, bool_mask) ) repr_sentences = list( it.compress(self.sentences, bool_mask) ) return repr_sentences, repr_bow_corpus_doc, repr_bow_corpus_idx
def _clip_document_len( self, topic_kwd_df, len_range=(10, 30), ): len_min, len_max = len_range len_series = topic_kwd_df['documents'].apply(len) # mask = np.where((len_series >= 100) & (len_series < 300)) res = topic_kwd_df.loc[ (len_series >= len_min) & (len_series < len_max), : ] return res
[docs] def get_representitive_documents( self, target_topic_num, len_range=(10, 30), top_n=10, save_ok=True, savepath='./', filename_affix='lda', ): """ List-up the most representitive documents by topic. Parameters ---------- target_topic_num: int A topic number of LDA model to use. len_range: `list` or `tuple` (default: `(10, 30)`) A candidate threshold by length. top_n: int (default: `10`) A document number to list-up, by topic. save_ok: bool (default: `True`) An option to save. savepath: str (default: `'./'`) A dirpath to load the topic-labeled sentences. filename_affix: str (default: `'lda'`) An affix of filename to load the topic-labeled sentences. Return ------ reordered: `pandas.DataFrame` Representitive documents, group by topic, ordery by its rank. Example ------- >>> import unipy_nlp.analyze.topic_modeling as utpm >>> tpm = utpm.TopicModeler(sentence_list, tokenized) >>> tpm.pick_best_lda_topics( ... num_topic=5, ... workers_n=8, ... random_seed=1, ... ) >>> sentence_labeled = tpm.estimate_topics_by_documents( ... 7, ... random_seed=1, ... save_ok=True, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... ) >>> sentence_repr = tpm.get_representitive_documents( ... 7, ... len_range=(10, 30), ... top_n=10, ... save_ok=True, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... ) """ if target_topic_num != self.selected_topic_num: raise ValueError( 'You should run `visualize_lda_to_html` first.' ) if self.dominant_topic_estimation_df is None: raise ValueError( "You should run `estimate_topics_by_documents` first." ) target_topic_num = len(self.topic_freq_df) repr_candidates_df = self._clip_document_len( self.dominant_topic_estimation_df, len_range=len_range, ) repr_docs_df = groupby_top_n( repr_candidates_df, group_by=['dominant_topic'], order_by=['contribution'], ascending=False, n=top_n, ) grouped_dict = dict(list(repr_docs_df.groupby(level=0))) reordered = pd.concat( [ grouped_dict[i] for i in self.topic_freq_df['dominant_topic'] ] ) self.representitive_docs = reordered if save_ok: os.makedirs(savepath, exist_ok=True) filename_str = os.path.join( savepath, '_'.join([ f'{filename_affix}', f'topics-{target_topic_num}', f'top{top_n}_repr_docs_df.csv', ]), ) reordered.to_csv( filename_str, index=True, header=True, encoding='utf-8', ) return reordered
[docs] def load_representitive_documents( self, target_topic_num, top_n=10, savepath='./', filename_affix='lda', ): """ Load the result of `self.get_representitive_documents`. Parameters ---------- target_topic_num: int A topic number of LDA model to use. top_n: int (default: `10`) A document number to list-up, by topic. The upper bound depends on how many documents saved. savepath: str (default: `'./'`) A dirpath to load the topic-labeled sentences. filename_affix: str (default: `'lda'`) An affix of filename to load the topic-labeled sentences. Return ------ dominant_topic_estimation_df: `pandas.DataFrame` Topic-labeled given(trained) sentences. topic_freq_df: `pandas.DataFrame` A rank table of topics by frequency. Example ------- >>> import unipy_nlp.analyze.topic_modeling as utpm >>> tpm = utpm.TopicModeler(sentence_list, tokenized) >>> tpm.pick_best_lda_topics( ... num_topic=5, ... workers_n=8, ... random_seed=1, ... ) >>> sentence_labeled = tpm.estimate_topics_by_documents( ... 7, ... random_seed=1, ... save_ok=True, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... ) >>> sentence_labeled, topic_freq = tpm.load_estimated( ... target_topic_num=7, ... savepath='data/_tmp_dump/topic_modeling', ... filename_affix='lda', ... ) """ filename_str = os.path.join( savepath, '_'.join([ f'{filename_affix}', f'topics-{target_topic_num}', f'top{top_n}_repr_docs_df.csv', ]), ) self.representitive_docs = pd.read_csv( filename_str, encoding='utf-8', ) return self.representitive_docs