diff --git a/jc/parsers/stat_s.py b/jc/parsers/stat_s.py index 92f26d5c..cffd80c0 100644 --- a/jc/parsers/stat_s.py +++ b/jc/parsers/stat_s.py @@ -77,12 +77,14 @@ import jc.utils from jc.streaming import ( add_jc_meta, streaming_input_type_check, streaming_line_input_type_check, raise_or_yield ) +from typing import Iterable +from jc.jc_types import JSONDictType, StreamingOutputType from jc.exceptions import ParseError class info(): """Provides parser metadata (version, author, etc.)""" - version = '1.1' + version = '1.2' description = '`stat` command streaming parser' author = 'Kelly Brazil' author_email = 'kellyjonbrazil@gmail.com' @@ -93,7 +95,7 @@ class info(): __version__ = info.version -def _process(proc_data): +def _process(proc_data: JSONDictType) -> JSONDictType: """ Final processing to conform to the schema. @@ -105,10 +107,10 @@ def _process(proc_data): Dictionary. Structured data to conform to the schema. """ - int_list = {'size', 'blocks', 'io_blocks', 'inode', 'links', 'uid', 'gid', + int_list: set[str] = {'size', 'blocks', 'io_blocks', 'inode', 'links', 'uid', 'gid', 'unix_device', 'rdev', 'block_size'} - null_list = {'access_time', 'modify_time', 'change_time', 'birth_time'} + null_list: set[str] = {'access_time', 'modify_time', 'change_time', 'birth_time'} for key in proc_data.copy(): if key in int_list: @@ -118,15 +120,23 @@ def _process(proc_data): if key in null_list: if proc_data[key] == '-': proc_data[key] = None - ts = jc.utils.timestamp(proc_data[key], format_hint=(7100, 7200)) - proc_data[key + '_epoch'] = ts.naive - proc_data[key + '_epoch_utc'] = ts.utc + + ts_string = proc_data[key] + if isinstance(ts_string, str) or ts_string is None: + ts = jc.utils.timestamp(ts_string, format_hint=(7100, 7200)) + proc_data[key + '_epoch'] = ts.naive + proc_data[key + '_epoch_utc'] = ts.utc return proc_data @add_jc_meta -def parse(data, raw=False, quiet=False, ignore_exceptions=False): +def parse( + data: Iterable[str], + raw: bool = False, + quiet: bool = False, + ignore_exceptions: bool = False +) -> StreamingOutputType: """ Main text parsing generator function. Returns an iterable object. @@ -146,7 +156,7 @@ def parse(data, raw=False, quiet=False, ignore_exceptions=False): jc.utils.compatibility(__name__, info.compatible, quiet) streaming_input_type_check(data) - output_line = {} + output_line: JSONDictType = {} os_type = '' for line in data: @@ -174,14 +184,16 @@ def parse(data, raw=False, quiet=False, ignore_exceptions=False): output_line['file'] = line_list[1] # populate link_to field if -> found - if ' -> ' in output_line['file']: - filename = output_line['file'].split(' -> ')[0].strip('\u2018').rstrip('\u2019') - link = output_line['file'].split(' -> ')[1].strip('\u2018').rstrip('\u2019') - output_line['file'] = filename - output_line['link_to'] = link - else: - filename = output_line['file'].split(' -> ')[0].strip('\u2018').rstrip('\u2019') - output_line['file'] = filename + file_string = output_line['file'] + if isinstance(file_string, str): + if ' -> ' in file_string: + filename = file_string.split(' -> ')[0].strip('\u2018').rstrip('\u2019') + link = file_string.split(' -> ')[1].strip('\u2018').rstrip('\u2019') + output_line['file'] = filename + output_line['link_to'] = link + else: + filename = file_string.split(' -> ')[0].strip('\u2018').rstrip('\u2019') + output_line['file'] = filename continue