1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-07-13 01:20:24 +02:00

new streamlined parser

This commit is contained in:
Kelly Brazil
2022-03-21 13:06:34 -07:00
parent 9ecbdb0916
commit 51ae5ebcac

View File

@ -44,7 +44,7 @@ Examples:
[]
"""
import re
from typing import Iterable, List, Dict, Optional, Generator
from typing import Iterable, Tuple, List, Dict
import jc.utils
from jc.exceptions import ParseError
@ -73,11 +73,6 @@ def _process(proc_data: List[Dict]) -> List[Dict]:
List of Dictionaries. Structured to conform to the schema.
"""
# remove newlines from values
# for item in proc_data:
# for k, v in item.items():
# item[k] = v.replace('\n', '')
return proc_data
@ -115,213 +110,191 @@ def _table_sniff(string: str) -> str:
return 'simple'
def _pretty_set_separators(table_lines: Iterable, separator: str) -> Generator[str, None, None]:
"""Return a generator that yields rows standardized separators"""
for line in table_lines:
strip_line = line.strip()
# skip any blanks
if not strip_line:
continue
# yield row separators as a sentinel string
if strip_line.startswith('╒═') and strip_line.endswith('═╕')\
or strip_line.startswith('╞═') and strip_line.endswith('═╡')\
or strip_line.startswith('╘═') and strip_line.endswith('═╛')\
or strip_line.startswith('┌─') and strip_line.endswith('─┐')\
or strip_line.startswith('├─') and strip_line.endswith('─┤')\
or strip_line.startswith('└─') and strip_line.endswith('─┘')\
or strip_line.startswith('+=') and strip_line.endswith('=+')\
or strip_line.startswith('+-') and strip_line.endswith('-+'):
yield separator
continue
# remove the table column separator characters and yield the line
# line = line.replace('|', ' ').replace('│', ' ')
yield line
def _is_separator(line: str) -> bool:
"""Returns true if a table separator line is found"""
strip_line = line.strip()
if strip_line.startswith('╒═') and strip_line.endswith('═╕')\
or strip_line.startswith('╞═') and strip_line.endswith('═╡')\
or strip_line.startswith('╘═') and strip_line.endswith('═╛')\
or strip_line.startswith('┌─') and strip_line.endswith('─┐')\
or strip_line.startswith('├─') and strip_line.endswith('─┤')\
or strip_line.startswith('└─') and strip_line.endswith('─┘')\
or strip_line.startswith('+=') and strip_line.endswith('=+')\
or strip_line.startswith('+-') and strip_line.endswith('-+'):
return True
return False
def _pretty_normalize_rows(table_lines: Iterable,
separator: str,
data_separator: str) -> Generator[str, None, None]:
def _snake_case(line: str) -> str:
"""replace spaces between words with an underscores and set to lowercase"""
return re.sub(r'\b \b', '_', line).lower()
def _fixup_separators(line: str) -> str:
"""Normalize separators, and remove first and last separators"""
# normalize separator
line = line.replace('', '|')
# remove first separator if it is the first char in the line
if line[0] == '|':
line = line.replace('|', ' ', 1)
# remove last separator if it is the last char in the line
if line[-1] == '|':
line = line[::-1].replace('|', ' ', 1)[::-1]
return line
def _normalize_rows(table_lines: Iterable[str]) -> List[Tuple[int, List[str]]]:
"""
Return a generator that yields header and data rows with different separators.
Also removes spaces from headers.
Return a List of tuples of a row counters and data lines.
"""
result = []
header_found = False
data_found = False
row_counter = 0
# Removes initial table lines, finds the header row(s) and separates
# the header from the data rows with different separator characters.
for i in table_lines:
if separator in i and not header_found and not data_found:
# top table frame
for line in table_lines:
# skip blank lines
if not line.strip():
continue
if not separator in i and not header_found and not data_found:
# skip top table frame
if _is_separator(line) and not header_found and not data_found:
continue
# first header row found
if not _is_separator(line) and not header_found and not data_found:
header_found = True
# first header data found
# remove spaces from header
i = re.sub(r'\b \b', '_', i)
yield i
line = _snake_case(line)
line = _fixup_separators(line)
line_list = line.split('|')
line_list = [x.strip() for x in line_list]
result.append((row_counter, line_list))
continue
if not separator in i and header_found and not data_found:
# subsequent header data found
# remove spaces from header
i = re.sub(r'\b \b', '_', i)
yield i
# subsequent header row found
if not _is_separator(line) and header_found and not data_found:
line = _snake_case(line)
line = _fixup_separators(line)
line_list = line.split('|')
line_list = [x.strip() for x in line_list]
result.append((row_counter, line_list))
continue
if separator in i and header_found and not data_found:
# table separator found - this is a header separator
if _is_separator(line) and header_found and not data_found:
data_found = True
# table separator found - this is a header separator
yield separator
continue
if not separator in i and header_found and data_found:
# subsequent data row found
yield i
continue
if separator in i and header_found and data_found:
# table separator found - this is a data separator
yield data_separator
row_counter += 1
continue
# subsequent data row found
if not _is_separator(line) and header_found and data_found:
line = _fixup_separators(line)
line_list = line.split('|')
line_list = [x.strip() for x in line_list]
result.append((row_counter, line_list))
continue
def _pretty_table_parse(table: Iterable) -> List[Dict]:
temp_table = []
for line in table:
# normalize separator
line = line.replace('', '|')
# table separator found - this is a data separator
if _is_separator(line) and header_found and data_found:
row_counter += 1
continue
# remove first separator if it is the first char in the line
if line[0] == '|':
line = line.replace('|', ' ', 1)
# remove last separator if it is the last char in the line
if line[-1] == '|':
line = line[::-1].replace('|', ' ', 1)[::-1]
temp_table.append([x.strip() for x in line.split('|')])
headers = temp_table[0]
raw_data = temp_table[1:]
result = [dict.fromkeys(headers, None)]
result.extend([dict(zip(headers, r)) for r in raw_data])
return result
def _pretty_remove_header_rows(table: List[Dict], sep: str, data_sep: str) -> List[Optional[Dict]]:
"""return a table with only data rows."""
# create a new list of row objects with new key names
data_obj_list: List[Optional[Dict]] = []
sep_found = False
data_sep_found = False
for obj in table:
#skip to data
for v in obj.values():
if not sep_found and not str(v).strip() == sep:
continue
if not sep_found and str(v).strip() == sep:
sep_found = True
continue
# append data row objects or None for separators
if sep_found:
for k, v in obj.items():
if str(v).strip() == data_sep:
data_sep_found = True
break
else:
data_sep_found = False
if data_sep_found:
data_obj_list.append(None)
def _get_headers(table: Iterable[Tuple[int, List]]) -> List[List[str]]:
"""
return a list of all of the header rows (which are lists of strings.
[ # headers
['str', 'str', 'str'], # header rows
['str', 'str', 'str']
]
"""
result = []
for row_num, line in table:
if row_num == 0:
result.append(line)
return result
def _get_data(table: Iterable[Tuple[int, List]]) -> List[List[List[str]]]:
"""
return a list of rows, which are lists made up of lists of strings:
[ # data
[ # rows
['str', 'str', 'str'] # lines
]
]
"""
result: List[List[List[str]]] = []
current_row = 1
this_line: List[List[str]] = []
for row_num, line in table:
if row_num != 0:
if row_num != current_row:
result.append(this_line)
current_row = row_num
this_line = []
this_line.append(line)
if this_line:
result.append(this_line)
return result
def _collapse_headers(table: List[List[str]]) -> List[str]:
"""append each column string to return the full header list"""
result = table[0]
for line in table[1:]:
new_line: List[str] = []
for i, header in enumerate(line):
if header:
new_header = result[i] + '_' + header
new_header = re.sub(r'__+', '_', new_header)
new_line.append(new_header)
else:
data_obj_list.append(obj)
new_line.append(result[i])
result = new_line
# remove first item, which is a separator
return data_obj_list[1:]
return result
def _pretty_map_new_keynames(table: List[Dict], sep: str) -> Dict:
"""
returns a dict of old keyname to new keyname mappings by consolidating
multiline keynames from the input list of dictionaries.
"""
# first get all header objects to find full keynames. Stop when data rows are found.
header_obj_list = []
sep_found = False
for obj in table:
for v in obj.values():
if str(v).strip() == sep:
sep_found = True
break
if sep_found:
break
header_obj_list.append(obj)
def _collapse_data(table: List[List[List[str]]]) -> List[List[str]]:
"""combine data rows to return a simple list of lists"""
result: List[List[str]] = []
if not header_obj_list:
header_obj_list = [{key: None for key in table[0]}]
for row in table:
new_row: List[str] = []
for line in row:
if new_row:
for i, item in enumerate(line):
new_row[i] = (new_row[i] + '\n' + item).strip()
else:
new_row = line
# create an old-key to new-key name mapping dict
new_keynames_dict = dict.fromkeys([key for key in header_obj_list[0]], '')
for item in new_keynames_dict:
new_keynames_dict[item] = item
for obj in header_obj_list:
for k, v in obj.items():
if v:
new_keynames_dict[k] = new_keynames_dict[k] + '_' + v
result.append(new_row)
# normalize keynames so they are lowercase, no spaces, and no redundat '_'s
for k, v in new_keynames_dict.items():
new_keynames_dict[k] = v.replace(' ', '_').lower()
new_keynames_dict[k] = re.sub(r'__+', '_', v)
return new_keynames_dict
return result
def _pretty_rename_keys(table: List, new_keynames: Dict) -> List[Optional[Dict]]:
"""rename all of the keys in the table based on the new_keynames mapping"""
renamed_key_table: List[Optional[Dict]] = []
for item in table:
if item:
renamed_key_table.append({new_keynames[k]:v for k, v in item.items()})
else:
renamed_key_table.append(None)
return renamed_key_table
def _pretty_consolidate_rows(table: List) -> List[Dict]:
"""go through all data objects and combine values between data separators"""
consolidated_rows = []
current_obj = dict.fromkeys([key for key in table[0]], '')
for item in table:
if not item:
consolidated_rows.append(current_obj)
current_obj = dict.fromkeys([key for key in table[0]], '')
continue
else:
for k, v in item.items():
if v:
if not current_obj[k]:
current_obj[k] = v
else:
current_obj[k] = current_obj[k] + '\n' + v
return consolidated_rows
def _create_table_dict(header: List[str], data: List[List[str]]) -> List[Dict[str, str]]:
return [dict(zip(header, r)) for r in data]
def _parse_pretty(string: str) -> List:
string_lines = string.splitlines()
sep = '~~~'
data_sep = '==='
separator = ' ' + sep + ' '
data_separator = ' ' + data_sep + ' '
string_lines: List[str] = string.splitlines()
clean: List[Tuple[int, List[str]]] = _normalize_rows(string_lines)
raw_headers: List[List[str]] = _get_headers(clean)
raw_data: List[List[List[str]]] = _get_data(clean)
clean: Generator = _pretty_set_separators(string_lines, separator)
normalized: Generator = _pretty_normalize_rows(clean, separator, data_separator)
raw_table: List[Dict] = _pretty_table_parse(normalized) # was sparse_table_parse()
new_keynames: Dict = _pretty_map_new_keynames(raw_table, sep)
data_table: List[Optional[Dict]] = _pretty_remove_header_rows(raw_table, sep, data_sep)
table_with_renamed_keys: List[Optional[Dict]] = _pretty_rename_keys(data_table, new_keynames)
final_table: List[Dict] = _pretty_consolidate_rows(table_with_renamed_keys)
new_headers: List[str] = _collapse_headers(raw_headers)
new_data: List[List[str]] = _collapse_data(raw_data)
final_table: List[Dict[str, str]] = _create_table_dict(new_headers, new_data)
return final_table