1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-25 00:37:31 +02:00

use jc_types

This commit is contained in:
Kelly Brazil
2022-10-18 15:46:29 -07:00
parent 2c4f232aaa
commit d7ca6caae8

View File

@ -77,12 +77,14 @@ import jc.utils
from jc.streaming import ( from jc.streaming import (
add_jc_meta, streaming_input_type_check, streaming_line_input_type_check, raise_or_yield add_jc_meta, streaming_input_type_check, streaming_line_input_type_check, raise_or_yield
) )
from typing import Iterable
from jc.jc_types import JSONDictType, StreamingOutputType
from jc.exceptions import ParseError from jc.exceptions import ParseError
class info(): class info():
"""Provides parser metadata (version, author, etc.)""" """Provides parser metadata (version, author, etc.)"""
version = '1.1' version = '1.2'
description = '`stat` command streaming parser' description = '`stat` command streaming parser'
author = 'Kelly Brazil' author = 'Kelly Brazil'
author_email = 'kellyjonbrazil@gmail.com' author_email = 'kellyjonbrazil@gmail.com'
@ -93,7 +95,7 @@ class info():
__version__ = info.version __version__ = info.version
def _process(proc_data): def _process(proc_data: JSONDictType) -> JSONDictType:
""" """
Final processing to conform to the schema. Final processing to conform to the schema.
@ -105,10 +107,10 @@ def _process(proc_data):
Dictionary. Structured data to conform to the schema. Dictionary. Structured data to conform to the schema.
""" """
int_list = {'size', 'blocks', 'io_blocks', 'inode', 'links', 'uid', 'gid', int_list: set[str] = {'size', 'blocks', 'io_blocks', 'inode', 'links', 'uid', 'gid',
'unix_device', 'rdev', 'block_size'} 'unix_device', 'rdev', 'block_size'}
null_list = {'access_time', 'modify_time', 'change_time', 'birth_time'} null_list: set[str] = {'access_time', 'modify_time', 'change_time', 'birth_time'}
for key in proc_data.copy(): for key in proc_data.copy():
if key in int_list: if key in int_list:
@ -118,15 +120,23 @@ def _process(proc_data):
if key in null_list: if key in null_list:
if proc_data[key] == '-': if proc_data[key] == '-':
proc_data[key] = None proc_data[key] = None
ts = jc.utils.timestamp(proc_data[key], format_hint=(7100, 7200))
proc_data[key + '_epoch'] = ts.naive ts_string = proc_data[key]
proc_data[key + '_epoch_utc'] = ts.utc if isinstance(ts_string, str) or ts_string is None:
ts = jc.utils.timestamp(ts_string, format_hint=(7100, 7200))
proc_data[key + '_epoch'] = ts.naive
proc_data[key + '_epoch_utc'] = ts.utc
return proc_data return proc_data
@add_jc_meta @add_jc_meta
def parse(data, raw=False, quiet=False, ignore_exceptions=False): def parse(
data: Iterable[str],
raw: bool = False,
quiet: bool = False,
ignore_exceptions: bool = False
) -> StreamingOutputType:
""" """
Main text parsing generator function. Returns an iterable object. Main text parsing generator function. Returns an iterable object.
@ -146,7 +156,7 @@ def parse(data, raw=False, quiet=False, ignore_exceptions=False):
jc.utils.compatibility(__name__, info.compatible, quiet) jc.utils.compatibility(__name__, info.compatible, quiet)
streaming_input_type_check(data) streaming_input_type_check(data)
output_line = {} output_line: JSONDictType = {}
os_type = '' os_type = ''
for line in data: for line in data:
@ -174,14 +184,16 @@ def parse(data, raw=False, quiet=False, ignore_exceptions=False):
output_line['file'] = line_list[1] output_line['file'] = line_list[1]
# populate link_to field if -> found # populate link_to field if -> found
if ' -> ' in output_line['file']: file_string = output_line['file']
filename = output_line['file'].split(' -> ')[0].strip('\u2018').rstrip('\u2019') if isinstance(file_string, str):
link = output_line['file'].split(' -> ')[1].strip('\u2018').rstrip('\u2019') if ' -> ' in file_string:
output_line['file'] = filename filename = file_string.split(' -> ')[0].strip('\u2018').rstrip('\u2019')
output_line['link_to'] = link link = file_string.split(' -> ')[1].strip('\u2018').rstrip('\u2019')
else: output_line['file'] = filename
filename = output_line['file'].split(' -> ')[0].strip('\u2018').rstrip('\u2019') output_line['link_to'] = link
output_line['file'] = filename else:
filename = file_string.split(' -> ')[0].strip('\u2018').rstrip('\u2019')
output_line['file'] = filename
continue continue