Source code for unipy_nlp.data_collector

# -*- coding: utf-8 -*-
"""Get Data from `xlsx`.

"""

# %%

import os
import re
import sys
import json
import random
import warnings
import subprocess
import itertools as it
import functools as ft
import collections
from glob import glob
from pprint import pprint
import numpy as np
import pandas as pd
import unidecode
from unicodedata import normalize

import matplotlib.font_manager as fm


# warnings.filterwarnings("ignore", category=DeprecationWarning)
# warnings.filterwarnings("ignore", category=FutureWarning)

import subprocess


__all__ = []
__all__ += [
    # 'read_xlsx_all_sheets',
    # 'read_xlsx_usymp',
    # 'collect_reply',
    'load_from_excel',
    'recursive_substitutor',
    # 'refine_content',
    'refine_nested_excel_to_dict',
    # 'split_and_filter',
    # 'split_and_expand_str_rows',
    'collect_data',
]


print(os.getcwd())
font_dict = {
    path.split('/')[-1][:-4]: path
    for path in fm.get_fontconfig_fonts()
    if 'nanum' in path.lower().split('/')[-1]
}

for _font_nm, _font_file in font_dict.items():
    subprocess.call(['cp', _font_file, './unipy_nlp/_resources/fonts/'])


def read_xlsx_all_sheets(filepath):
    print(f"Loading '{filepath}'...")
    res = pd.read_excel(
        filepath,
        sheet_name=None,
        header=None, names=['content'],
        dtype=str,
        encoding='utf-8',
    )
    return res


def read_xlsx_usymp(filepath):
    print(f"Loading '{filepath}'...")
    sheet_name = 'Clientes'
    tmp = pd.read_excel(
        filepath,
        sheet_name=sheet_name,
        dtype=str,
        encoding='utf-8',
    ).replace('nan', np.NaN)

    tmp.columns = [
        c.replace('Unnamed: ', 'un_')
        if 'Unnamed: ' in c
        else c
        for c in tmp.columns
    ]

    tmp['title_agg'] = tmp.iloc[:, :4].apply(
        lambda x: x.dropna().max(),
        axis=1,
    )
    tmp['reply_agg'] = (
        tmp.loc[:, tmp.columns[tmp.columns.str.contains('un_')][3:]]
        .apply(lambda x: x.dropna().max(), axis=1)
    )

    tmp['title_yn'] = tmp['조회수'].notnull()

    tmp['title'] = tmp.loc[tmp['title_yn'] == True, ['title_agg']]
    tmp['body'] = tmp.loc[tmp['title_yn'].shift(1) == True, ['title_agg']]
    tmp['body'] = tmp['body'].shift(-1)

    idx_srs = tmp['title'].dropna().index.to_series()

    idx_start = idx_srs + 2

    idx_end = idx_srs.shift(-1)
    idx_end.iloc[-1] = tmp.index[-1]

    idx_range_df = (
        pd.DataFrame(
            [idx_start, idx_end],
            index=['start', 'end']
        )
        .T
    ).astype(np.int)

    tmp['reply_idx'] = idx_range_df.apply(
        lambda x: list(range(x['start'], x['end'])),
        axis=1,
    )

    def collect_reply(df, reply_idx):
        if reply_idx not in ([], np.NaN):
            res = df.loc[reply_idx, 'reply_agg'].dropna().tolist()
        else:
            res = []
        return res

    tmp['reply'] = (
        tmp['reply_idx']
        .apply(lambda x: collect_reply(tmp, x))
        .apply(lambda x: '\n'.join(x))
    )
    # tmp['reply_joined'] = (
    #     tmp['reply']
    #     .apply(lambda x: '\n'.join(x))
    # )
    tmp = tmp[['title', 'body', 'reply']].dropna().reset_index(drop=True)
    tmp['content'] = (
        pd.Series(
            tmp[tmp.columns.drop('reply')]
            .fillna('')
            .values
            .tolist()
        )
        .str.join('\n')
    )

    return collections.OrderedDict({sheet_name: tmp})


[docs]def load_from_excel(filepath) -> collections.Generator: fpath = filepath dump_path = f'{fpath}/_tmp_dump' if not os.path.isdir(dump_path): os.makedirs(dump_path, exist_ok=False) print(f"'Results will be saved in {dump_path}") filepath_list = glob(f'{fpath}/saveasnew/*.xlsx') category_list = [ re.findall(r'.*rawdata_(.+)_saveasnew.xlsx', s)[0] for s in filepath_list ] loaded_gen = ( (category, read_xlsx_all_sheets(filepath) if 'usymphony' not in category else read_xlsx_usymp(filepath) ) for category, filepath in zip(category_list, filepath_list) )
def _repl(mat, ri=re.compile(r'([\.\,\'\"]+\s*)')): return ri.sub(r'', ''.join(mat.group(0, 2))) # return ri.sub(r'', mat.group(0, 2)) # return ri.sub(r'', mat.group(0)) # def list_keeper_from_recursion(**kwargs): # def wrap(func): # # args_keys = func.__code__.co_varnames # for key, value in kwargs.items(): # setattr(func, key, value) # return func # return wrap # def pattern_list_keeper_from_recursion(recursive_func): # @ft.wraps(recursive_func) # def pattern_list_keeped(*args, **kwargs): # [ # argname # for argname in recursive_func.__code__.co_varnames # if argname not in ['args', 'kwargs'] # ] # args_keys = func.func_code.co_varnames # new_recursive_func = partial(recursive_func, pattern_list=) # return new_recursive_func(*args, **kwargs) # return pattern_list_keeped
[docs]def recursive_substitutor(str_or_series, pattern_list, flags=0): pattern_list = pattern_list.copy() if isinstance(str_or_series, str): if pattern_list: # if len(pattern_list) > 0: pattern, target = pattern_list.pop(0) str_or_series = re.sub(pattern, target, str_or_series, flags=flags) return recursive_substitutor(str_or_series, pattern_list) else: return str_or_series elif isinstance(str_or_series, pd.Series): if pattern_list: pattern, target = pattern_list.pop(0) str_or_series = str_or_series.str.replace( pattern, target, flags=flags, ) return recursive_substitutor(str_or_series, pattern_list, flags=flags) else: return str_or_series else: raise TypeError('`str_or_series` must be a `str` or `pandas.Series`')
def refine_content(df) -> pd.DataFrame: col_suffix = '_refined' raw_col_list = df.columns[~df.columns.str.contains(col_suffix)] new_col_list = raw_col_list + col_suffix print(f'colnames: {raw_col_list.tolist()}') pattern_tuple = ( [r'40(.*)41', r'(\1)'], [r'39(.*)', r'`\1'], [r'\t|\a', r' '], [r'(\r)', r' '], # [r'(\r\n)', r'\n'], [r'(\n)', r' '], [r"(([vox]{1}\\:\* |\.shape+[\s]+)\{.+\})", r''], [r'\b((http?|https|ftp|file)\://\S+)', r''], [r'\(\?\)', r' '], [r'[▷◇△▲▽▼★\<\>\+\-\=_·♬\|\(\)\[\]\{\}\*\'\"\~⓪①②③④⑤⑥⑦⑧⑨⑩]+', r' '], [r'\b(\[답변\]|\[열린소리\]|\[분실물\D*\])', r''], [r'^(.+)([\s]*[(from)(sent)]\: .+(subject)\:).+', _repl], [r'(subject\:|re\:)', r''], [r'(\s*\d{4} [\d]{1,2}:\d{2} [ap]{1}m[\s]{0,1}to:){0,1} (.+)', r'\2'], [r'(nbsp)', r' '], [r'([\s\D]+/[\s\w]+/(sk)[\;]*)', r''], [r'[\;]+', r' '], [r'([0-9]{2,3}\-[0-9]{2,4}\-[0-9]{4})', r''], [r'\S+@][a-zA-Z0-9\._]+', r''], [r'\b(\D+\([^\(\)]+\)/\S+/\S+[:;]*)\b', r''], [r'/', r' '], [r'\s+', r' '], [r'[\^]{2,}', r'\^\^\n'], [r'(\B[음임함됨요(니다)])\b', r'\1\n'], [r'([\.\!\?]+[\s]*)', r'\n'], ) # pattern_list = [(re.compile(ptn), target) for ptn, target in pattern_list] def _replace_list_str(row, pattern_tuple=pattern_tuple) -> list: if len(row) > 0: res = [ recursive_substitutor( normalize('NFKC', row), list(pattern_tuple), flags=re.IGNORECASE, ) for s in row ] else: res = recursive_substitutor( normalize('NFKC', row), list(pattern_tuple), flags=re.IGNORECASE, ) return res for col in raw_col_list: if isinstance(df[col][0], str): df[f'{col}{col_suffix}'] = recursive_substitutor( ( df[col] .apply(lambda s: normalize('NFKC', s)) .str.lower() ), list(pattern_tuple), flags=re.IGNORECASE, ) elif isinstance(df[col][0], list): df[f'{col}{col_suffix}'] = ( df[col] .apply(_replace_list_str) .str.lower() ) res_df = df[new_col_list] res_df.columns = raw_col_list res_dict = res_df.to_dict(orient='list') return res_dict
[docs]def refine_nested_excel_to_dict(xlsx_loaded) -> pd.DataFrame: if isinstance(xlsx_loaded, collections.OrderedDict): for sheet_name in xlsx_loaded.keys(): print(f"{sheet_name}") xlsx_loaded[sheet_name] = refine_content(xlsx_loaded[sheet_name]) if sheet_name == 'Sheet1': new_sheet_name = '19년' xlsx_loaded[new_sheet_name] = xlsx_loaded.pop(sheet_name) else: raise TypeError( '`xlsx_loaded` must be an `collections.OrderedDict` with its sheet name.' ) return xlsx_loaded
def _concat_list(nested_list): return sum(nested_list, []) def split_and_filter( iterable, sep='\n', filter_func=lambda x: x not in (r'\n', r'') ) -> set: concatted = _concat_list(list(iterable)) splitted = _concat_list([s.split(sep) for s in concatted]) filtered = filter(filter_func, splitted) # uniqued = set(filtered) return filtered def split_and_expand_str_rows(dataframe, colname_str, split_by='\n'): expanded_df = dataframe.drop([colname_str], axis=1).join( dataframe [colname_str] .str.split(split_by, expand=True) .stack() .reset_index(level=1, drop=True) .rename(colname_str) ).reset_index(drop=True) expanded_df = ( expanded_df .loc[~expanded_df[colname_str].isin(["", " "]), :] .reset_index(drop=True) ) expanded_df[colname_str] = ( expanded_df [colname_str] .str.strip() # .str.replace(r'(^\s+|\s+$)', r'') ) return expanded_df
[docs]def collect_data( filepath, dump_filepath=None, dump_json_ok=True, return_tuple=True ): """ Summary This function is for to gather text from `xslx` rawdata. Not designed for general uses. Parameters ---------- filepath: str A directory `xslx` file(s) exists. dump_json_ok: bool (default: `True`) `True` if how: {'equal', 'remaining'} The method to split. 'equal' is to split chunks with the approximate length within the given size. 'remaining' is to split chunks with the given size, and the remains are bound as the last chunk. size: int The number of chunks. Returns ------- list A list of chunks. See Also -------- Examples -------- >>> up.splitter(list(range(10)), how='equal', size=3) [(0, 1, 2, 3), (4, 5, 6), (7, 8, 9)] >>> up.splitter(list(range(10)), how='remaining', size=3) [(0, 1, 2), (3, 4, 5), (6, 7, 8), (9,)] """ # fpath = filepath # dumppath = f'{fpath}/_tmp_dump' dumppath, dumpfile = os.path.split(dump_filepath) if not os.path.isdir(dumppath): os.makedirs(dumppath, exist_ok=False) print(f"'Results will be saved: {dump_filepath}") filepath_list = glob( os.path.join( filepath, '*.xlsx', ) ) # category_list = [ # re.findall(r'.*rawdata_(.+)_saveasnew.xlsx', s)[0] # for s in filepath_list # ] category_list = [ os.path.splitext(os.path.basename(fpath))[0] for fpath in filepath_list ] print(category_list) loaded_gen = ( (category, read_xlsx_all_sheets(filepath) if 'usymp' not in category else read_xlsx_usymp(filepath) ) for category, filepath in zip(category_list, filepath_list) ) refined_gen = ( (category, refine_nested_excel_to_dict(excel_data)) for category, excel_data in loaded_gen ) flatted_gen = ( { 'table_nm': table_nm, 'sheet_nm': sheet_nm, 'contents': contents, } for table_nm, table_contents in refined_gen for sheet_nm, sheet_contents in table_contents.items() for contents in sum(list(sheet_contents.values()), []) # for contents in split_and_filter(sheet_contents.values()) ) res_df = pd.DataFrame(flatted_gen).drop_duplicates() except_cols = res_df.columns.drop('contents').tolist() res_df = ( res_df .groupby(except_cols) ['contents'] .apply(lambda x: ' '.join(x)) .reset_index() ) res_df = split_and_expand_str_rows( res_df, colname_str='contents', split_by=r'\n', ) print(f"\nResult: {res_df.shape}") dump_filename = None if dump_json_ok: dump_filename = dump_filepath res_df.to_json( dump_filename, orient='records', force_ascii=False, lines=True, ) if return_tuple: return res_df, dump_filename else: return { 'data': res_df, 'filename': dump_filename, }