1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-17 00:07:37 +02:00

move some functions to streaming

This commit is contained in:
Kelly Brazil
2022-02-06 08:55:42 -08:00
parent df00945b46
commit 45314c9808
2 changed files with 112 additions and 102 deletions

111
jc/streaming.py Normal file
View File

@ -0,0 +1,111 @@
"""jc - JSON CLI output utility streaming utils"""
from functools import wraps
from typing import Dict, Iterable
def stream_success(output_line: Dict, ignore_exceptions: bool) -> Dict:
"""Add `_jc_meta` object to output line if `ignore_exceptions=True`"""
if ignore_exceptions:
output_line.update({'_jc_meta': {'success': True}})
return output_line
def stream_error(e: BaseException, line: str) -> Dict:
"""
Return an error `_jc_meta` field.
"""
return {
'_jc_meta':
{
'success': False,
'error': f'{e.__class__.__name__}: {e}',
'line': line.strip()
}
}
def add_jc_meta(func):
"""
Decorator for streaming parsers to add stream_success and stream_error
objects. This simplifies the yield lines in the streaming parsers.
With the decorator on parse():
# successfully parsed line:
yield output_line if raw else _process(output_line)
# unsuccessfully parsed line:
except Exception as e:
yield raise_or_yield(ignore_exceptions, e, line)
Without the decorator on parse():
# successfully parsed line:
yield stream_success(output_line, ignore_exceptions) if raw \\
else stream_success(_process(output_line), ignore_exceptions)
# unsuccessfully parsed line:
except Exception as e:
yield stream_error(raise_or_yield(ignore_exceptions, e, line))
In all cases above:
output_line: (Dict) successfully parsed line yielded as a dict
e: (BaseException) exception object as the first value
of the tuple if the line was not successfully parsed.
line: (str) string of the original line that did not
successfully parse.
ignore_exceptions: (bool) continue processing lines and ignore
exceptions if True.
"""
@wraps(func)
def wrapper(*args, **kwargs):
ignore_exceptions = kwargs.get('ignore_exceptions', False)
gen = func(*args, **kwargs)
for value in gen:
# if the yielded value is a dict, then we know it was a
# successfully parsed line
if isinstance(value, dict):
yield stream_success(value, ignore_exceptions)
# otherwise it will be a tuple and we know it was an error
else:
exception_obj = value[0]
line = value[1]
yield stream_error(exception_obj, line)
return wrapper
def streaming_input_type_check(data: Iterable) -> None:
"""
Ensure input data is an iterable, but not a string or bytes. Raises
`TypeError` if not.
"""
if not hasattr(data, '__iter__') or isinstance(data, (str, bytes)):
raise TypeError("Input data must be a non-string iterable object.")
def streaming_line_input_type_check(line: str) -> None:
"""Ensure each line is a string. Raises `TypeError` if not."""
if not isinstance(line, str):
raise TypeError("Input line must be a 'str' object.")
def raise_or_yield(
ignore_exceptions: bool,
e: BaseException,
line: str
) -> tuple:
ignore_exceptions_msg = '... Use the ignore_exceptions option (-qq) to ignore streaming parser errors.'
if not ignore_exceptions:
e.args = (str(e) + ignore_exceptions_msg,)
raise e
return e, line

View File

@ -3,10 +3,9 @@ import sys
import re
import locale
import shutil
from functools import wraps
from datetime import datetime, timezone
from textwrap import TextWrapper
from typing import Dict, Iterable, List, Union, Optional
from typing import List, Union, Optional
def warning_message(message_lines: List[str]) -> None:
@ -222,112 +221,12 @@ def convert_to_bool(value: Union[str, int, float]) -> bool:
return False
def stream_success(output_line: Dict, ignore_exceptions: bool) -> Dict:
"""Add `_jc_meta` object to output line if `ignore_exceptions=True`"""
if ignore_exceptions:
output_line.update({'_jc_meta': {'success': True}})
return output_line
ignore_exceptions_msg = '... Use the ignore_exceptions option (-qq) to ignore streaming parser errors.'
def stream_error(e: BaseException, line: str) -> Dict:
"""
Return an error `_jc_meta` field.
"""
return {
'_jc_meta':
{
'success': False,
'error': f'{e.__class__.__name__}: {e}',
'line': line.strip()
}
}
def add_jc_meta(func):
"""
Decorator for streaming parsers to add stream_success and stream_error
objects. This simplifies the yield lines in the streaming parsers.
With the decorator on parse():
# successfully parsed line:
yield output_line if raw else _process(output_line)
# unsuccessfully parsed line:
except Exception as e:
if not ignore_exceptions:
e.args = (str(e) + ignore_exceptions_msg,)
raise e
yield e, line
Without the decorator on parse():
# successfully parsed line:
yield stream_success(output_line, ignore_exceptions) if raw else stream_success(_process(output_line), ignore_exceptions)
# unsuccessfully parsed line:
except Exception as e:
if not ignore_exceptions:
e.args = (str(e) + ignore_exceptions_msg,)
raise e
yield stream_error(e, line)
In all cases above:
output_line: (Dict): successfully parsed line yielded as a dict
e: (BaseException): exception object as the first value
of the tuple if the line was not successfully parsed.
line: (str): string of the original line that did not
successfully parse.
"""
@wraps(func)
def wrapper(*args, **kwargs):
ignore_exceptions = kwargs.get('ignore_exceptions', False)
gen = func(*args, **kwargs)
for value in gen:
# if the yielded value is a dict, then we know it was a
# successfully parsed line
if isinstance(value, dict):
yield stream_success(value, ignore_exceptions)
# otherwise it will be a tuple and we know it was an error
else:
exception_obj = value[0]
line = value[1]
yield stream_error(exception_obj, line)
return wrapper
def input_type_check(data: str) -> None:
"""Ensure input data is a string. Raises `TypeError` if not."""
if not isinstance(data, str):
raise TypeError("Input data must be a 'str' object.")
def streaming_input_type_check(data: Iterable) -> None:
"""
Ensure input data is an iterable, but not a string or bytes. Raises
`TypeError` if not.
"""
if not hasattr(data, '__iter__') or isinstance(data, (str, bytes)):
raise TypeError("Input data must be a non-string iterable object.")
def streaming_line_input_type_check(line: str) -> None:
"""Ensure each line is a string. Raises `TypeError` if not."""
if not isinstance(line, str):
raise TypeError("Input line must be a 'str' object.")
class timestamp:
def __init__(self, datetime_string: str) -> None:
"""