1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-17 00:07:37 +02:00
Files
jc/jc/utils.py
2022-02-03 15:44:18 -08:00

514 lines
20 KiB
Python

"""jc - JSON CLI output utility utils"""
import sys
import re
import locale
import shutil
from functools import wraps
from datetime import datetime, timezone
from textwrap import TextWrapper
from typing import Dict, Iterable, List, Union, Optional
def warning_message(message_lines: List[str]) -> None:
"""
Prints warning message for non-fatal issues. The first line is
prepended with 'jc: Warning - ' and subsequent lines are indented.
Wraps text as needed based on the terminal width.
Parameters:
message: (list) list of string lines
Returns:
None - just prints output to STDERR
"""
# this is for backwards compatibility with existing custom parsers
if isinstance(message_lines, str):
message_lines = [message_lines]
columns = shutil.get_terminal_size().columns
first_wrapper = TextWrapper(width=columns, subsequent_indent=' ' * 15)
next_wrapper = TextWrapper(width=columns, initial_indent=' ' * 15,
subsequent_indent=' ' * 19)
first_line = message_lines.pop(0)
first_str = f'jc: Warning - {first_line}'
first_str = first_wrapper.fill(first_str)
print(first_str, file=sys.stderr)
for line in message_lines:
if line == '':
continue
message = next_wrapper.fill(line)
print(message, file=sys.stderr)
def error_message(message_lines: List[str]) -> None:
"""
Prints an error message for fatal issues. The first line is
prepended with 'jc: Error - ' and subsequent lines are indented.
Wraps text as needed based on the terminal width.
Parameters:
message: (list) list of string lines
Returns:
None - just prints output to STDERR
"""
columns = shutil.get_terminal_size().columns
first_wrapper = TextWrapper(width=columns, subsequent_indent=' ' * 13)
next_wrapper = TextWrapper(width=columns, initial_indent=' ' * 13,
subsequent_indent=' ' * 17)
first_line = message_lines.pop(0)
first_str = f'jc: Error - {first_line}'
first_str = first_wrapper.fill(first_str)
print(first_str, file=sys.stderr)
for line in message_lines:
if line == '':
continue
message = next_wrapper.fill(line)
print(message, file=sys.stderr)
def compatibility(mod_name: str, compatible: List, quiet: bool = False) -> None:
"""
Checks for the parser's compatibility with the running OS
platform.
Parameters:
mod_name: (string) __name__ of the calling module
compatible: (list) sys.platform name(s) compatible with
the parser. compatible options:
linux, darwin, cygwin, win32, aix, freebsd
quiet: (bool) supress compatibility message if True
Returns:
None - just prints output to STDERR
"""
if not quiet:
platform_found = False
for platform in compatible:
if sys.platform.startswith(platform):
platform_found = True
break
if not platform_found:
mod = mod_name.split('.')[-1]
compat_list = ', '.join(compatible)
warning_message([f'{mod} parser not compatible with your OS ({sys.platform}).',
f'Compatible platforms: {compat_list}'])
def has_data(data: str) -> bool:
"""
Checks if the input contains data. If there are any non-whitespace
characters then return True, else return False.
Parameters:
data: (string) input to check whether it contains data
Returns:
Boolean True if input string (data) contains non-whitespace
characters, otherwise False
"""
return bool(data and not data.isspace())
def convert_to_int(value: Union[str, float]) -> Optional[int]:
"""
Converts string and float input to int. Strips all non-numeric
characters from strings.
Parameters:
value: (string/float) Input value
Returns:
integer/None Integer if successful conversion, otherwise None
"""
if isinstance(value, str):
str_val = re.sub(r'[^0-9\-\.]', '', value)
try:
return int(str_val)
except (ValueError, TypeError):
try:
return int(float(str_val))
except (ValueError, TypeError):
return None
elif isinstance(value, (int, float)):
return int(value)
else:
return None
def convert_to_float(value: Union[str, int]) -> Optional[float]:
"""
Converts string and int input to float. Strips all non-numeric
characters from strings.
Parameters:
value: (string/integer) Input value
Returns:
float/None Float if successful conversion, otherwise None
"""
if isinstance(value, str):
try:
return float(re.sub(r'[^0-9\-\.]', '', value))
except (ValueError, TypeError):
return None
elif isinstance(value, (int, float)):
return float(value)
else:
return None
def convert_to_bool(value: Union[str, int, float]) -> bool:
"""
Converts string, integer, or float input to boolean by checking
for 'truthy' values.
Parameters:
value: (string/integer/float) Input value
Returns:
True/False False unless a 'truthy' number or string is found
('y', 'yes', 'true', '1', 1, -1, etc.)
"""
# if number, then bool it
# if string, try to convert to float
# if float converts, then bool the result
# if float does not convert then look for truthy string and bool True
# else False
truthy = ['y', 'yes', 'true', '*']
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
try:
test_value = convert_to_float(value)
if test_value is not None:
return bool(test_value)
except Exception:
pass
if value:
return value.lower() in truthy
return False
def stream_success(output_line: Dict, ignore_exceptions: bool) -> Dict:
"""Add `_jc_meta` object to output line if `ignore_exceptions=True`"""
if ignore_exceptions:
output_line.update({'_jc_meta': {'success': True}})
return output_line
def stream_error(e: BaseException, ignore_exceptions: bool, line: str) -> Dict:
"""
Reraise the stream exception with annotation or print an error
`_jc_meta` field if `ignore_exceptions=True`.
"""
if not ignore_exceptions:
e.args = (str(e) + '... Use the ignore_exceptions option (-qq) to ignore streaming parser errors.',)
raise e
return {
'_jc_meta':
{
'success': False,
'error': f'{e.__class__.__name__}: {e}',
'line': line.strip()
}
}
def add_jc_meta(func):
"""
Decorator for streaming parsers to add stream_success and stream_error
objects. This simplifies the yield lines in the streaming parsers.
With the decorator on parse():
# successfully parsed line:
yield output_line if raw else _process(output_line)
# unsuccessfully parsed line:
except Exception as e:
yield e, line
Without the decorator on parse():
# successfully parsed line:
yield stream_success(output_line, ignore_exceptions) if raw else stream_success(_process(output_line), ignore_exceptions)
# unsuccessfully parsed line:
except Exception as e:
yield stream_error(e, ignore_exceptions, line)
In all cases above:
output_line: (Dict): successfully parsed line yielded as a dict
e: (BaseException): exception object as the first value
of the tuple if the line was not successfully parsed.
line: (str): string of the original line that did not
successfully parse.
"""
@wraps(func)
def wrapper(*args, **kwargs):
ignore_exceptions = kwargs.get('ignore_exceptions', False)
gen = func(*args, **kwargs)
for value in gen:
# if the yielded value is a dict, then we know it was a
# successfully parsed line
if isinstance(value, dict):
yield stream_success(value, ignore_exceptions)
# otherwise it will be a tuple and we know it was an error
else:
exception_obj = value[0]
line = value[1]
yield stream_error(exception_obj, ignore_exceptions, line)
return wrapper
def input_type_check(data: str) -> None:
"""Ensure input data is a string. Raises `TypeError` if not."""
if not isinstance(data, str):
raise TypeError("Input data must be a 'str' object.")
def streaming_input_type_check(data: Iterable) -> None:
"""
Ensure input data is an iterable, but not a string or bytes. Raises
`TypeError` if not.
"""
if not hasattr(data, '__iter__') or isinstance(data, (str, bytes)):
raise TypeError("Input data must be a non-string iterable object.")
def streaming_line_input_type_check(line: str) -> None:
"""Ensure each line is a string. Raises `TypeError` if not."""
if not isinstance(line, str):
raise TypeError("Input line must be a 'str' object.")
class timestamp:
def __init__(self, datetime_string: str) -> None:
"""
Input a date-time text string of several formats and convert to a
naive or timezone-aware epoch timestamp in UTC.
Parameters:
datetime_string (str): a string representation of a
datetime in several supported formats
Returns a timestamp object with the following attributes:
string (str): the input datetime string
format (int | None): the format rule that was used to decode
the datetime string. None if conversion fails.
naive (int | None): timestamp based on locally configured
timezone. None if conversion fails.
utc (int | None): aware timestamp only if UTC timezone
detected in datetime string. None if conversion fails.
"""
self.string = datetime_string
dt = self._parse()
self.format = dt['format']
self.naive = dt['timestamp_naive']
self.utc = dt['timestamp_utc']
def __repr__(self):
return f'timestamp(string="{self.string}", format={self.format}, naive={self.naive}, utc={self.utc})'
def _parse(self):
"""
Input a date-time text string of several formats and convert to
a naive or timezone-aware epoch timestamp in UTC.
Parameters:
data: (string) a string representation of a date-time
in several supported formats
Returns:
Dictionary of the following format:
{
# for debugging purposes. None if conversion fails
"format": int,
# timestamp based on locally configured timezone.
# None if conversion fails.
"timestamp_naive": int,
# aware timestamp only if UTC timezone detected.
# None if conversion fails.
"timestamp_utc": int
}
The `format` integer denotes which date_time format
conversion succeeded.
The `timestamp_naive` integer is the converted date-time
string to a naive epoch timestamp.
The `timestamp_utc` integer is the converted date-time
string to an aware epoch timestamp in the UTC timezone. If
an aware conversion cannot be performed (e.g. the UTC
timezone is not found in the date-time string), then this
field will be None.
If the conversion completely fails, all fields will be None.
"""
data = self.string or ''
normalized_datetime = ''
utc_tz = False
dt = None
dt_utc = None
timestamp_naive = None
timestamp_utc = None
timestamp_obj = {
'format': None,
'timestamp_naive': None,
'timestamp_utc': None
}
utc_tz = False
# sometimes UTC is referenced as 'Coordinated Universal Time'. Convert to 'UTC'
data = data.replace('Coordinated Universal Time', 'UTC')
if 'UTC' in data:
utc_tz = True
if 'UTC+' in data or 'UTC-' in data:
utc_tz = bool('UTC+0000' in data or 'UTC-0000' in data)
elif '+0000' in data or '-0000' in data:
utc_tz = True
formats = [
{'id': 1000, 'format': '%a %b %d %H:%M:%S %Y', 'locale': None}, # manual C locale format conversion: Tue Mar 23 16:12:11 2021 or Tue Mar 23 16:12:11 IST 2021
{'id': 1500, 'format': '%Y-%m-%d %H:%M', 'locale': None}, # en_US.UTF-8 local format (found in who cli output): 2021-03-23 00:14
{'id': 1600, 'format': '%m/%d/%Y %I:%M %p', 'locale': None}, # Windows english format (found in dir cli output): 12/07/2019 02:09 AM
{'id': 1700, 'format': '%m/%d/%Y, %I:%M:%S %p', 'locale': None}, # Windows english format wint non-UTC tz (found in systeminfo cli output): 3/22/2021, 1:15:51 PM (UTC-0600)
{'id': 1705, 'format': '%m/%d/%Y, %I:%M:%S %p %Z', 'locale': None}, # Windows english format with UTC tz (found in systeminfo cli output): 3/22/2021, 1:15:51 PM (UTC)
{'id': 1710, 'format': '%m/%d/%Y, %I:%M:%S %p UTC%z', 'locale': None}, # Windows english format with UTC tz (found in systeminfo cli output): 3/22/2021, 1:15:51 PM (UTC+0000)
{'id': 2000, 'format': '%a %d %b %Y %I:%M:%S %p %Z', 'locale': None}, # en_US.UTF-8 local format (found in upower cli output): Tue 23 Mar 2021 04:12:11 PM UTC
{'id': 3000, 'format': '%a %d %b %Y %I:%M:%S %p', 'locale': None}, # en_US.UTF-8 local format with non-UTC tz (found in upower cli output): Tue 23 Mar 2021 04:12:11 PM IST
{'id': 4000, 'format': '%A %d %B %Y %I:%M:%S %p %Z', 'locale': None}, # European-style local format (found in upower cli output): Tuesday 01 October 2019 12:50:41 PM UTC
{'id': 5000, 'format': '%A %d %B %Y %I:%M:%S %p', 'locale': None}, # European-style local format with non-UTC tz (found in upower cli output): Tuesday 01 October 2019 12:50:41 PM IST
{'id': 6000, 'format': '%a %b %d %I:%M:%S %p %Z %Y', 'locale': None}, # en_US.UTF-8 format (found in date cli): Wed Mar 24 06:16:19 PM UTC 2021
{'id': 7000, 'format': '%a %b %d %H:%M:%S %Z %Y', 'locale': None}, # C locale format (found in date cli): Wed Mar 24 11:11:30 UTC 2021
{'id': 7100, 'format': '%b %d %H:%M:%S %Y', 'locale': None}, # C locale format (found in stat cli output - osx): # Mar 29 11:49:05 2021
{'id': 7200, 'format': '%Y-%m-%d %H:%M:%S.%f %z', 'locale': None}, # C locale format (found in stat cli output - linux): 2019-08-13 18:13:43.555604315 -0400
{'id': 7250, 'format': '%Y-%m-%d %H:%M:%S', 'locale': None}, # C locale format with non-UTC tz (found in modified vmstat cli output): # 2021-09-16 20:32:28 PDT
{'id': 7255, 'format': '%Y-%m-%d %H:%M:%S %Z', 'locale': None}, # C locale format (found in modified vmstat cli output): # 2021-09-16 20:32:28 UTC
{'id': 7300, 'format': '%a %Y-%m-%d %H:%M:%S %Z', 'locale': None}, # C locale format (found in timedatectl cli output): # Wed 2020-03-11 00:53:21 UTC
# attempt locale changes last
{'id': 8000, 'format': '%a %d %b %Y %H:%M:%S %Z', 'locale': ''}, # current locale format (found in upower cli output): # mar. 23 mars 2021 23:12:11 UTC
{'id': 8100, 'format': '%a %d %b %Y %H:%M:%S', 'locale': ''}, # current locale format with non-UTC tz (found in upower cli output): # mar. 23 mars 2021 19:12:11 EDT
{'id': 8200, 'format': '%A %d %B %Y, %H:%M:%S UTC%z', 'locale': ''}, # fr_FR.utf8 locale format (found in date cli output): vendredi 26 mars 2021, 13:26:46 (UTC+0000)
{'id': 8300, 'format': '%A %d %B %Y, %H:%M:%S', 'locale': ''}, # fr_FR.utf8 locale format with non-UTC tz (found in date cli output): vendredi 26 mars 2021, 13:26:46 (UTC-0400)
{'id': 9000, 'format': '%c', 'locale': ''} # locally configured locale format conversion: Could be anything :) this is a last-gasp attempt
]
# from https://www.timeanddate.com/time/zones/
# only removed UTC timezone and added known non-UTC offsets
tz_abbr = [
'A', 'ACDT', 'ACST', 'ACT', 'ACWST', 'ADT', 'AEDT', 'AEST', 'AET', 'AFT', 'AKDT',
'AKST', 'ALMT', 'AMST', 'AMT', 'ANAST', 'ANAT', 'AQTT', 'ART', 'AST', 'AT', 'AWDT',
'AWST', 'AZOST', 'AZOT', 'AZST', 'AZT', 'AoE', 'B', 'BNT', 'BOT', 'BRST', 'BRT', 'BST',
'BTT', 'C', 'CAST', 'CAT', 'CCT', 'CDT', 'CEST', 'CET', 'CHADT', 'CHAST', 'CHOST',
'CHOT', 'CHUT', 'CIDST', 'CIST', 'CKT', 'CLST', 'CLT', 'COT', 'CST', 'CT', 'CVT', 'CXT',
'ChST', 'D', 'DAVT', 'DDUT', 'E', 'EASST', 'EAST', 'EAT', 'ECT', 'EDT', 'EEST', 'EET',
'EGST', 'EGT', 'EST', 'ET', 'F', 'FET', 'FJST', 'FJT', 'FKST', 'FKT', 'FNT', 'G',
'GALT', 'GAMT', 'GET', 'GFT', 'GILT', 'GMT', 'GST', 'GYT', 'H', 'HDT', 'HKT', 'HOVST',
'HOVT', 'HST', 'I', 'ICT', 'IDT', 'IOT', 'IRDT', 'IRKST', 'IRKT', 'IRST', 'IST', 'JST',
'K', 'KGT', 'KOST', 'KRAST', 'KRAT', 'KST', 'KUYT', 'L', 'LHDT', 'LHST', 'LINT', 'M',
'MAGST', 'MAGT', 'MART', 'MAWT', 'MDT', 'MHT', 'MMT', 'MSD', 'MSK', 'MST', 'MT', 'MUT',
'MVT', 'MYT', 'N', 'NCT', 'NDT', 'NFDT', 'NFT', 'NOVST', 'NOVT', 'NPT', 'NRT', 'NST',
'NUT', 'NZDT', 'NZST', 'O', 'OMSST', 'OMST', 'ORAT', 'P', 'PDT', 'PET', 'PETST', 'PETT',
'PGT', 'PHOT', 'PHT', 'PKT', 'PMDT', 'PMST', 'PONT', 'PST', 'PT', 'PWT', 'PYST', 'PYT',
'Q', 'QYZT', 'R', 'RET', 'ROTT', 'S', 'SAKT', 'SAMT', 'SAST', 'SBT', 'SCT', 'SGT',
'SRET', 'SRT', 'SST', 'SYOT', 'T', 'TAHT', 'TFT', 'TJT', 'TKT', 'TLT', 'TMT', 'TOST',
'TOT', 'TRT', 'TVT', 'U', 'ULAST', 'ULAT', 'UYST', 'UYT', 'UZT', 'V', 'VET', 'VLAST',
'VLAT', 'VOST', 'VUT', 'W', 'WAKT', 'WARST', 'WAST', 'WAT', 'WEST', 'WET', 'WFT',
'WGST', 'WGT', 'WIB', 'WIT', 'WITA', 'WST', 'WT', 'X', 'Y', 'YAKST', 'YAKT', 'YAPT',
'YEKST', 'YEKT', 'Z', 'UTC-1200', 'UTC-1100', 'UTC-1000', 'UTC-0930', 'UTC-0900',
'UTC-0800', 'UTC-0700', 'UTC-0600', 'UTC-0500', 'UTC-0400', 'UTC-0300', 'UTC-0230',
'UTC-0200', 'UTC-0100', 'UTC+0100', 'UTC+0200', 'UTC+0300', 'UTC+0400', 'UTC+0430',
'UTC+0500', 'UTC+0530', 'UTC+0545', 'UTC+0600', 'UTC+0630', 'UTC+0700', 'UTC+0800',
'UTC+0845', 'UTC+0900', 'UTC+1000', 'UTC+1030', 'UTC+1100', 'UTC+1200', 'UTC+1300',
'UTC+1345', 'UTC+1400'
]
# normalize the timezone by taking out any timezone reference, except UTC
cleandata = data.replace('(', '').replace(')', '')
normalized_datetime_list = []
for term in cleandata.split():
if term not in tz_abbr:
normalized_datetime_list.append(term)
normalized_datetime = ' '.join(normalized_datetime_list)
# normalize further by converting any greater-than 6-digit subsecond to 6-digits
p = re.compile(r'(\W\d\d:\d\d:\d\d\.\d{6})\d+\W')
normalized_datetime = p.sub(r'\g<1> ', normalized_datetime)
for fmt in formats:
try:
locale.setlocale(locale.LC_TIME, fmt['locale'])
dt = datetime.strptime(normalized_datetime, fmt['format'])
timestamp_naive = int(dt.replace(tzinfo=None).timestamp())
timestamp_obj['format'] = fmt['id']
locale.setlocale(locale.LC_TIME, None)
break
except Exception:
locale.setlocale(locale.LC_TIME, None)
continue
if dt and utc_tz:
dt_utc = dt.replace(tzinfo=timezone.utc)
timestamp_utc = int(dt_utc.timestamp())
if timestamp_naive:
timestamp_obj['timestamp_naive'] = timestamp_naive
timestamp_obj['timestamp_utc'] = timestamp_utc
return timestamp_obj