diff --git a/jc/parsers/asciitable_m.py b/jc/parsers/asciitable_m.py index c76f54d1..5af62101 100644 --- a/jc/parsers/asciitable_m.py +++ b/jc/parsers/asciitable_m.py @@ -44,7 +44,7 @@ Examples: [] """ import re -from typing import Iterable, List, Dict, Optional, Generator +from typing import Iterable, Tuple, List, Dict import jc.utils from jc.exceptions import ParseError @@ -73,11 +73,6 @@ def _process(proc_data: List[Dict]) -> List[Dict]: List of Dictionaries. Structured to conform to the schema. """ - # remove newlines from values - # for item in proc_data: - # for k, v in item.items(): - # item[k] = v.replace('\n', '') - return proc_data @@ -115,213 +110,191 @@ def _table_sniff(string: str) -> str: return 'simple' -def _pretty_set_separators(table_lines: Iterable, separator: str) -> Generator[str, None, None]: - """Return a generator that yields rows standardized separators""" - for line in table_lines: - strip_line = line.strip() - - # skip any blanks - if not strip_line: - continue - - # yield row separators as a sentinel string - if strip_line.startswith('╒═') and strip_line.endswith('═╕')\ - or strip_line.startswith('╞═') and strip_line.endswith('═╡')\ - or strip_line.startswith('╘═') and strip_line.endswith('═╛')\ - or strip_line.startswith('┌─') and strip_line.endswith('─┐')\ - or strip_line.startswith('├─') and strip_line.endswith('─┤')\ - or strip_line.startswith('└─') and strip_line.endswith('─┘')\ - or strip_line.startswith('+=') and strip_line.endswith('=+')\ - or strip_line.startswith('+-') and strip_line.endswith('-+'): - yield separator - continue - - # remove the table column separator characters and yield the line - # line = line.replace('|', ' ').replace('│', ' ') - yield line +def _is_separator(line: str) -> bool: + """Returns true if a table separator line is found""" + strip_line = line.strip() + if strip_line.startswith('╒═') and strip_line.endswith('═╕')\ + or strip_line.startswith('╞═') and strip_line.endswith('═╡')\ + or strip_line.startswith('╘═') and strip_line.endswith('═╛')\ + or strip_line.startswith('┌─') and strip_line.endswith('─┐')\ + or strip_line.startswith('├─') and strip_line.endswith('─┤')\ + or strip_line.startswith('└─') and strip_line.endswith('─┘')\ + or strip_line.startswith('+=') and strip_line.endswith('=+')\ + or strip_line.startswith('+-') and strip_line.endswith('-+'): + return True + return False -def _pretty_normalize_rows(table_lines: Iterable, - separator: str, - data_separator: str) -> Generator[str, None, None]: +def _snake_case(line: str) -> str: + """replace spaces between words with an underscores and set to lowercase""" + return re.sub(r'\b \b', '_', line).lower() + + +def _fixup_separators(line: str) -> str: + """Normalize separators, and remove first and last separators""" + # normalize separator + line = line.replace('│', '|') + + # remove first separator if it is the first char in the line + if line[0] == '|': + line = line.replace('|', ' ', 1) + + # remove last separator if it is the last char in the line + if line[-1] == '|': + line = line[::-1].replace('|', ' ', 1)[::-1] + + return line + + +def _normalize_rows(table_lines: Iterable[str]) -> List[Tuple[int, List[str]]]: """ - Return a generator that yields header and data rows with different separators. - Also removes spaces from headers. + Return a List of tuples of a row counters and data lines. """ + result = [] header_found = False data_found = False + row_counter = 0 - # Removes initial table lines, finds the header row(s) and separates - # the header from the data rows with different separator characters. - for i in table_lines: - if separator in i and not header_found and not data_found: - # top table frame + for line in table_lines: + # skip blank lines + if not line.strip(): continue - if not separator in i and not header_found and not data_found: + + # skip top table frame + if _is_separator(line) and not header_found and not data_found: + continue + + # first header row found + if not _is_separator(line) and not header_found and not data_found: header_found = True - # first header data found - # remove spaces from header - i = re.sub(r'\b \b', '_', i) - yield i + line = _snake_case(line) + line = _fixup_separators(line) + line_list = line.split('|') + line_list = [x.strip() for x in line_list] + result.append((row_counter, line_list)) continue - if not separator in i and header_found and not data_found: - # subsequent header data found - # remove spaces from header - i = re.sub(r'\b \b', '_', i) - yield i + + # subsequent header row found + if not _is_separator(line) and header_found and not data_found: + line = _snake_case(line) + line = _fixup_separators(line) + line_list = line.split('|') + line_list = [x.strip() for x in line_list] + result.append((row_counter, line_list)) continue - if separator in i and header_found and not data_found: + + # table separator found - this is a header separator + if _is_separator(line) and header_found and not data_found: data_found = True - # table separator found - this is a header separator - yield separator - continue - if not separator in i and header_found and data_found: - # subsequent data row found - yield i - continue - if separator in i and header_found and data_found: - # table separator found - this is a data separator - yield data_separator + row_counter += 1 continue + # subsequent data row found + if not _is_separator(line) and header_found and data_found: + line = _fixup_separators(line) + line_list = line.split('|') + line_list = [x.strip() for x in line_list] + result.append((row_counter, line_list)) + continue -def _pretty_table_parse(table: Iterable) -> List[Dict]: - temp_table = [] - for line in table: - # normalize separator - line = line.replace('│', '|') + # table separator found - this is a data separator + if _is_separator(line) and header_found and data_found: + row_counter += 1 + continue - # remove first separator if it is the first char in the line - if line[0] == '|': - line = line.replace('|', ' ', 1) - - # remove last separator if it is the last char in the line - if line[-1] == '|': - line = line[::-1].replace('|', ' ', 1)[::-1] - - temp_table.append([x.strip() for x in line.split('|')]) - - headers = temp_table[0] - raw_data = temp_table[1:] - result = [dict.fromkeys(headers, None)] - result.extend([dict(zip(headers, r)) for r in raw_data]) return result -def _pretty_remove_header_rows(table: List[Dict], sep: str, data_sep: str) -> List[Optional[Dict]]: - """return a table with only data rows.""" - # create a new list of row objects with new key names - data_obj_list: List[Optional[Dict]] = [] - sep_found = False - data_sep_found = False - for obj in table: - #skip to data - for v in obj.values(): - if not sep_found and not str(v).strip() == sep: - continue - if not sep_found and str(v).strip() == sep: - sep_found = True - continue - # append data row objects or None for separators - if sep_found: - for k, v in obj.items(): - if str(v).strip() == data_sep: - data_sep_found = True - break - else: - data_sep_found = False - if data_sep_found: - data_obj_list.append(None) +def _get_headers(table: Iterable[Tuple[int, List]]) -> List[List[str]]: + """ + return a list of all of the header rows (which are lists of strings. + [ # headers + ['str', 'str', 'str'], # header rows + ['str', 'str', 'str'] + ] + """ + result = [] + for row_num, line in table: + if row_num == 0: + result.append(line) + return result + + +def _get_data(table: Iterable[Tuple[int, List]]) -> List[List[List[str]]]: + """ + return a list of rows, which are lists made up of lists of strings: + [ # data + [ # rows + ['str', 'str', 'str'] # lines + ] + ] + """ + result: List[List[List[str]]] = [] + current_row = 1 + this_line: List[List[str]] = [] + for row_num, line in table: + if row_num != 0: + if row_num != current_row: + result.append(this_line) + current_row = row_num + this_line = [] + + this_line.append(line) + + if this_line: + result.append(this_line) + + return result + + +def _collapse_headers(table: List[List[str]]) -> List[str]: + """append each column string to return the full header list""" + result = table[0] + for line in table[1:]: + new_line: List[str] = [] + for i, header in enumerate(line): + if header: + new_header = result[i] + '_' + header + new_header = re.sub(r'__+', '_', new_header) + new_line.append(new_header) else: - data_obj_list.append(obj) + new_line.append(result[i]) + result = new_line - # remove first item, which is a separator - return data_obj_list[1:] + return result -def _pretty_map_new_keynames(table: List[Dict], sep: str) -> Dict: - """ - returns a dict of old keyname to new keyname mappings by consolidating - multiline keynames from the input list of dictionaries. - """ - # first get all header objects to find full keynames. Stop when data rows are found. - header_obj_list = [] - sep_found = False - for obj in table: - for v in obj.values(): - if str(v).strip() == sep: - sep_found = True - break - if sep_found: - break - header_obj_list.append(obj) +def _collapse_data(table: List[List[List[str]]]) -> List[List[str]]: + """combine data rows to return a simple list of lists""" + result: List[List[str]] = [] - if not header_obj_list: - header_obj_list = [{key: None for key in table[0]}] + for row in table: + new_row: List[str] = [] + for line in row: + if new_row: + for i, item in enumerate(line): + new_row[i] = (new_row[i] + '\n' + item).strip() + else: + new_row = line - # create an old-key to new-key name mapping dict - new_keynames_dict = dict.fromkeys([key for key in header_obj_list[0]], '') - for item in new_keynames_dict: - new_keynames_dict[item] = item - for obj in header_obj_list: - for k, v in obj.items(): - if v: - new_keynames_dict[k] = new_keynames_dict[k] + '_' + v + result.append(new_row) - # normalize keynames so they are lowercase, no spaces, and no redundat '_'s - for k, v in new_keynames_dict.items(): - new_keynames_dict[k] = v.replace(' ', '_').lower() - new_keynames_dict[k] = re.sub(r'__+', '_', v) - - return new_keynames_dict + return result -def _pretty_rename_keys(table: List, new_keynames: Dict) -> List[Optional[Dict]]: - """rename all of the keys in the table based on the new_keynames mapping""" - renamed_key_table: List[Optional[Dict]] = [] - for item in table: - if item: - renamed_key_table.append({new_keynames[k]:v for k, v in item.items()}) - else: - renamed_key_table.append(None) - - return renamed_key_table - - -def _pretty_consolidate_rows(table: List) -> List[Dict]: - """go through all data objects and combine values between data separators""" - consolidated_rows = [] - current_obj = dict.fromkeys([key for key in table[0]], '') - for item in table: - if not item: - consolidated_rows.append(current_obj) - current_obj = dict.fromkeys([key for key in table[0]], '') - continue - else: - for k, v in item.items(): - if v: - if not current_obj[k]: - current_obj[k] = v - else: - current_obj[k] = current_obj[k] + '\n' + v - - return consolidated_rows +def _create_table_dict(header: List[str], data: List[List[str]]) -> List[Dict[str, str]]: + return [dict(zip(header, r)) for r in data] def _parse_pretty(string: str) -> List: - string_lines = string.splitlines() - sep = '~~~' - data_sep = '===' - separator = ' ' + sep + ' ' - data_separator = ' ' + data_sep + ' ' + string_lines: List[str] = string.splitlines() + clean: List[Tuple[int, List[str]]] = _normalize_rows(string_lines) + raw_headers: List[List[str]] = _get_headers(clean) + raw_data: List[List[List[str]]] = _get_data(clean) - clean: Generator = _pretty_set_separators(string_lines, separator) - normalized: Generator = _pretty_normalize_rows(clean, separator, data_separator) - raw_table: List[Dict] = _pretty_table_parse(normalized) # was sparse_table_parse() - new_keynames: Dict = _pretty_map_new_keynames(raw_table, sep) - data_table: List[Optional[Dict]] = _pretty_remove_header_rows(raw_table, sep, data_sep) - table_with_renamed_keys: List[Optional[Dict]] = _pretty_rename_keys(data_table, new_keynames) - final_table: List[Dict] = _pretty_consolidate_rows(table_with_renamed_keys) + new_headers: List[str] = _collapse_headers(raw_headers) + new_data: List[List[str]] = _collapse_data(raw_data) + final_table: List[Dict[str, str]] = _create_table_dict(new_headers, new_data) return final_table