1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-17 00:07:37 +02:00
Files
jc/jc/utils.py
Kelly Brazil 3a7ca118ff doc updates
2025-05-27 09:06:26 -07:00

859 lines
35 KiB
Python

"""jc - JSON Convert utils"""
import sys
import re
import locale
import shutil
from itertools import islice
from collections import namedtuple
from numbers import Number
from datetime import datetime, timezone
from textwrap import TextWrapper
from functools import lru_cache
from typing import Any, List, Dict, Iterable, Union, Optional, TextIO
from .jc_types import TimeStampFormatType
CLI_QUIET = False
def _asciify(string: str) -> str:
"""
Return a string downgraded from Unicode to ASCII with some simple
conversions.
"""
string = string.replace('©', '(c)')
# the ascii() function adds single quotes around the string
string = ascii(string)[1:-1]
string = string.replace(r'\n', '\n')
return string
def _safe_print(
string: str,
sep: str = ' ',
end: str = '\n',
file: TextIO = sys.stdout,
flush: bool = False
) -> None:
"""Output for both UTF-8 and ASCII encoding systems"""
try:
print(string, sep=sep, end=end, file=file, flush=flush)
except UnicodeEncodeError:
print(_asciify(string), sep=sep, end=end, file=file, flush=flush)
def _safe_pager(string: str) -> None:
"""Pager output for both UTF-8 and ASCII encoding systems"""
from pydoc import pager
try:
pager(string)
except UnicodeEncodeError:
pager(_asciify(string))
def warning_message(message_lines: List[str]) -> None:
"""
Prints warning message to `STDERR` for non-fatal issues. The first line
is prepended with 'jc: Warning - ' and subsequent lines are indented.
Wraps text as needed based on the terminal width.
Parameters:
message: (list) list of string lines
Returns:
None - just prints output to STDERR
"""
if CLI_QUIET:
return
# this is for backwards compatibility with existing custom parsers
if isinstance(message_lines, str):
message_lines = [message_lines]
columns = shutil.get_terminal_size().columns
first_wrapper = TextWrapper(width=columns, subsequent_indent=' ' * 15)
next_wrapper = TextWrapper(width=columns, initial_indent=' ' * 15,
subsequent_indent=' ' * 19)
first_line = message_lines.pop(0)
first_str = f'jc: Warning - {first_line}'
first_str = first_wrapper.fill(first_str)
_safe_print(first_str, file=sys.stderr)
for line in message_lines:
if line == '':
continue
message = next_wrapper.fill(line)
_safe_print(message, file=sys.stderr)
def error_message(message_lines: List[str]) -> None:
"""
Prints an error message to `STDERR` for fatal issues. The first line is
prepended with 'jc: Error - ' and subsequent lines are indented.
Wraps text as needed based on the terminal width.
Parameters:
message: (list) list of string lines
Returns:
None - just prints output to `STDERR`
"""
columns = shutil.get_terminal_size().columns
first_wrapper = TextWrapper(width=columns, subsequent_indent=' ' * 13)
next_wrapper = TextWrapper(width=columns, initial_indent=' ' * 13,
subsequent_indent=' ' * 17)
first_line = message_lines.pop(0)
first_str = f'jc: Error - {first_line}'
first_str = first_wrapper.fill(first_str)
_safe_print(first_str, file=sys.stderr)
for line in message_lines:
if line == '':
continue
message = next_wrapper.fill(line)
_safe_print(message, file=sys.stderr)
def is_compatible(compatible: List[str]) -> bool:
"""
Returns True if the parser is compatible with the running OS platform.
"""
platform_found = False
for platform in compatible:
if sys.platform.startswith(platform):
platform_found = True
break
return platform_found
def compatibility(mod_name: str, compatible: List[str], quiet: bool = False) -> None:
"""
Checks for the parser's compatibility with the running OS platform and
prints a warning message to `STDERR` if not compatible and
`quiet=False.`
Parameters:
mod_name: (string) __name__ of the calling module
compatible: (list) sys.platform name(s) compatible with
the parser. compatible options:
linux, darwin, cygwin, win32, aix, freebsd
quiet: (bool) suppress compatibility message if `True`
Returns:
None - just prints output to STDERR
"""
if not quiet and not is_compatible(compatible):
mod = mod_name.split('.')[-1]
compat_list = ', '.join(compatible)
warning_message([
f'`{mod}` command output from this OS ({sys.platform}) is not supported.',
f'`{mod}` command output from the following platforms is supported: {compat_list}',
'Disregard this warning if you are processing output that came from a supported platform. (Use the -q option to suppress this warning)'
])
def has_data(data: Union[str, bytes]) -> bool:
"""
Checks if the string input contains data. If there are any
non-whitespace characters then return `True`, else return `False`.
For bytes, returns `True` if there is any data.
Parameters:
data: (string, bytes) input to check whether it contains data
Returns:
Boolean `True` if input string (data) contains non-whitespace
characters, otherwise `False`. For bytes data, returns
`True` if there is any data, otherwise `False`.
"""
if isinstance(data, str):
return bool(data and not data.isspace())
return bool(data)
def remove_quotes(data: str) -> str:
"""
Remove single or double quotes surrounding a string. If no quotes are
found then the string is returned unmodified.
Parameters:
data: (string) Input value
Returns:
string
"""
if data.startswith('"') and data.endswith('"'):
data = data[1:-1]
elif data.startswith("'") and data.endswith("'"):
data = data[1:-1]
return data
def normalize_key(data: str) -> str:
r"""
Normalize a key name by shifting to lower-case and converting special
characters to underscores.
Special characters are defined as `space` and the following:
!"#$%&'()*+,-./:;<=>?@[\]^`{|}~
This is a lossy algorithm. Repeating and trailing underscores are
removed.
Parameters:
data: (string) Input value
Returns:
string
"""
special = r'''!"#$%&'()*+,-./:;<=>?@[\]^`{|}~ '''
initial_underscore = False
data = data.strip().lower()
for special_char in special:
data = data.replace(special_char, '_')
if data.startswith('_'):
initial_underscore = True
# swap back to space so split() will compress multiple consecutive down to one
data = data.strip('_').replace('_', ' ')
data = '_'.join(data.split())
if initial_underscore:
data = '_' + data
return data
def convert_to_int(value: object) -> Optional[int]:
"""
Converts string and float input to int. Strips all non-numeric
characters from strings.
Parameters:
value: (string/float) Input value
Returns:
integer/None Integer if successful conversion, otherwise None
"""
if isinstance(value, str):
str_val = re.sub(r'[^0-9\-\.]', '', value)
try:
return int(str_val)
except (ValueError, TypeError):
try:
return int(float(str_val))
except (ValueError, TypeError):
return None
elif isinstance(value, (int, float)):
return int(value)
else:
return None
def convert_to_float(value: object) -> Optional[float]:
"""
Converts string and int input to float. Strips all non-numeric
characters from strings.
Parameters:
value: (string/integer) Input value
Returns:
float/None Float if successful conversion, otherwise None
"""
if isinstance(value, str):
try:
return float(re.sub(r'[^0-9\-\.]', '', value))
except (ValueError, TypeError):
return None
elif isinstance(value, (int, float)):
return float(value)
else:
return None
def convert_to_bool(value: object) -> bool:
"""
Converts string, integer, or float input to boolean by checking
for 'truthy' values.
Parameters:
value: (string/integer/float) Input value
Returns:
True/False False unless a 'truthy' number or string is found
('y', 'yes', 'true', '1', 1, -1, etc.)
"""
# if number, then bool it
# if string, try to convert to float
# if float converts, then bool the result
# if float does not convert then look for truthy string and bool True
# else False
truthy = ['y', 'yes', 'true', '*']
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
try:
test_value = convert_to_float(value)
if test_value is not None:
return bool(test_value)
except Exception:
pass
if value:
return value.lower() in truthy
return False
# convert_size_to_int from https://github.com/xolox/python-humanfriendly
# Copyright (c) 2021 Peter Odding
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
def convert_size_to_int(
size: str,
binary: bool = False,
posix_mode: bool = False,
decimal_bias: bool = False) -> Optional[int]:
"""
Parse a human readable data size and return the number of bytes.
Parameters:
size: (string) The human readable file size to parse.
binary: (boolean) `True` to use binary multiples of bytes
(base-2) for ambiguous unit symbols and names,
`False` to use decimal multiples of bytes (base-10).
posix_mode: (boolean) Treat one-letter units (k, m, g, etc.) as
binary.
decimal_bias: (boolean) `True` to treat slightly ambiguous two-
letter unit symbols ending in "i" (e.g. Ki, Gi) to
use decimal multiples of bytes (base-10). `False`
(default) to use binary multiples of bytes.
Returns:
integer/None Integer if successful conversion, otherwise None
This function knows how to parse sizes in bytes, kilobytes, megabytes,
gigabytes, terabytes and petabytes. Some examples:
>>> convert_size_to_int('42')
42
>>> convert_size_to_int('13b')
13
>>> convert_size_to_int('5 bytes')
5
>>> convert_size_to_int('1 KB')
1000
>>> convert_size_to_int('1 kilobyte')
1000
>>> convert_size_to_int('1 KiB')
1024
>>> convert_size_to_int('1 Ki')
1024
>>> convert_size_to_int('1 Ki', decimal_bias=True)
1000
>>> convert_size_to_int('1 KB', binary=True)
1024
>>> convert_size_to_int('1.5 GB')
1500000000
>>> convert_size_to_int('1.5 GB', binary=True)
1610612736
"""
# normalize input by removing commas
size = size.replace(',', '')
def tokenize(text: str) -> List[str]:
tokenized_input: List = []
for token in re.split(r'(\d+(?:\.\d+)?)', text):
token = token.strip()
if re.match(r'\d+\.\d+', token):
tokenized_input.append(float(token))
elif token.isdigit():
tokenized_input.append(int(token))
elif token:
tokenized_input.append(token)
return tokenized_input
SizeUnit = namedtuple('SizeUnit', 'divider, symbol, name')
CombinedUnit = namedtuple('CombinedUnit', 'decimal, binary')
disk_size_units = (
CombinedUnit(SizeUnit(1000**1, 'KB', 'kilobyte'), SizeUnit(1024**1, 'KiB', 'kibibyte')),
CombinedUnit(SizeUnit(1000**2, 'MB', 'megabyte'), SizeUnit(1024**2, 'MiB', 'mebibyte')),
CombinedUnit(SizeUnit(1000**3, 'GB', 'gigabyte'), SizeUnit(1024**3, 'GiB', 'gibibyte')),
CombinedUnit(SizeUnit(1000**4, 'TB', 'terabyte'), SizeUnit(1024**4, 'TiB', 'tebibyte')),
CombinedUnit(SizeUnit(1000**5, 'PB', 'petabyte'), SizeUnit(1024**5, 'PiB', 'pebibyte')),
CombinedUnit(SizeUnit(1000**6, 'EB', 'exabyte'), SizeUnit(1024**6, 'EiB', 'exbibyte')),
CombinedUnit(SizeUnit(1000**7, 'ZB', 'zettabyte'), SizeUnit(1024**7, 'ZiB', 'zebibyte')),
CombinedUnit(SizeUnit(1000**8, 'YB', 'yottabyte'), SizeUnit(1024**8, 'YiB', 'yobibyte')),
)
tokens = tokenize(size)
if tokens and isinstance(tokens[0], Number):
# Get the normalized unit (if any) from the tokenized input.
normalized_unit = tokens[1].lower() if len(tokens) == 2 and isinstance(tokens[1], str) else ''
# If the input contains only a number, it's assumed to be the number of
# bytes. The second token can also explicitly reference the unit bytes.
if len(tokens) == 1 or normalized_unit.startswith('b'):
return int(tokens[0])
# Otherwise we expect two tokens: A number and a unit.
if normalized_unit:
# Convert plural units to singular units, for details:
# https://github.com/xolox/python-humanfriendly/issues/26
normalized_unit = normalized_unit.rstrip('s')
# Handle POSIX mode units where `k`, `m`, etc. are treated as binary
# https://www.gnu.org/software/coreutils/manual/html_node/Block-size.html
if len(normalized_unit) == 1 and posix_mode:
normalized_unit = normalized_unit + 'ib'
# Handle two-letter units (Ki, Gi, etc.) These are somewhat
# ambiguous, but are treated as binary by default. This can be
# changed with the `decimal_bias` parameter
if len(normalized_unit) == 2 and normalized_unit[1].lower() == 'i':
if decimal_bias:
normalized_unit = normalized_unit[0]
else:
normalized_unit = normalized_unit + 'b'
for unit in disk_size_units:
# First we check for unambiguous symbols (KiB, MiB, GiB, etc)
# and names (kibibyte, mebibyte, gibibyte, etc) because their
# handling is always the same.
if normalized_unit in (unit.binary.symbol.lower(), unit.binary.name.lower()):
return int(tokens[0] * unit.binary.divider)
# Now we will deal with ambiguous prefixes (K, M, G, etc),
# symbols (KB, MB, GB, etc) and names (kilobyte, megabyte,
# gigabyte, etc) according to the caller's preference.
if (normalized_unit in (unit.decimal.symbol.lower(), unit.decimal.name.lower())
or normalized_unit.startswith(unit.decimal.symbol[0].lower())):
return int(tokens[0] * (unit.binary.divider if binary else unit.decimal.divider))
# We failed to parse the size specification.
return None
def input_type_check(data: object) -> None:
"""Ensure input data is a string. Raises `TypeError` if not."""
if not isinstance(data, str):
raise TypeError("Input data must be a 'str' object.")
def _lazy_splitlines(text: str) -> Iterable[str]:
NEWLINES_PATTERN: str = r'(\r\n|\r|\n)'
NEWLINES_RE = re.compile(NEWLINES_PATTERN)
start = 0
for m in NEWLINES_RE.finditer(text):
begin, end = m.span()
if begin != start:
yield text[start:begin]
else:
yield ''
start = end
if text[start:]:
yield text[start:]
def line_slice(
data: Union[str, Iterable[str], TextIO, bytes, None],
slice_start: Optional[int] = None,
slice_end: Optional[int] = None
) -> Union[str, Iterable[str], TextIO, bytes, None]:
"""
Slice input data by lines - lazily, if possible.
Accepts a string (for normal parsers) or an iterable (for streaming
parsers). Uses normal start/stop slicing values, but will always slice
on lines instead of characters. Positive slices will use less memory as
the function will attempt to lazily iterate over the input. A negative
slice parameter will force the function to read in all of the data and
then slice, which will use more memory.
Parameters:
data: (string or iterable) - input to slice by lines
slice_start: (int) - starting line
slice_end: (int) - ending line
Returns:
string if input is a string.
iterable of strings if input is an iterable (for streaming parsers)
"""
if not slice_start is None or not slice_end is None:
# standard parsers UTF-8 input
if isinstance(data, str):
data_iter = _lazy_splitlines(data)
# positive slices
if (slice_start is None or slice_start >= 0) \
and (slice_end is None or slice_end >= 0):
return '\n'.join(islice(data_iter, slice_start, slice_end))
# negative slices found (non-lazy, uses more memory)
else:
return '\n'.join(list(data_iter)[slice_start:slice_end])
# standard parsers bytes input
elif isinstance(data, bytes):
raise ValueError('Cannot slice bytes data.')
# streaming parsers UTF-8 input
else:
# positive slices
if (slice_start is None or slice_start >= 0) \
and (slice_end is None or slice_end >= 0) \
and data:
return islice(data, slice_start, slice_end)
# negative slices found (non-lazy, uses more memory)
elif data:
return list(data)[slice_start:slice_end]
return data
class timestamp:
__slots__ = ('string', 'format', 'naive', 'utc', 'iso')
def __init__(self,
datetime_string: Optional[str],
format_hint: Optional[Iterable[int]] = None
) -> None:
"""
Input a datetime text string of several formats and convert to a
naive or timezone-aware epoch timestamp in UTC.
Parameters:
datetime_string (str): a string representation of a
datetime in several supported formats
format_hint (iterable): an optional iterable of format ID
integers to instruct the timestamp object to try those
formats first in the order given. Other formats will be
tried after the format hint list is exhausted. This can
speed up timestamp conversion so several different formats
don't have to be tried in brute-force fashion.
Returns a timestamp object with the following attributes:
string (str): the input datetime string
format (int | None): the format rule that was used to decode
the datetime string. None if conversion fails.
naive (int | None): timestamp based on locally configured
timezone. None if conversion fails.
utc (int | None): aware timestamp only if UTC timezone
detected in datetime string. None if conversion fails.
iso (str | None): ISO string - timezone information is output
only if UTC timezone is detected in the datetime string.
"""
self.string = datetime_string
if not format_hint:
format_hint = tuple()
else:
format_hint = tuple(format_hint)
dt = self._parse_dt(self.string, format_hint=format_hint)
self.format = dt['format']
self.naive = dt['timestamp_naive']
self.utc = dt['timestamp_utc']
self.iso = dt['iso']
def __repr__(self) -> str:
return f'timestamp(string={self.string!r}, format={self.format}, naive={self.naive}, utc={self.utc}, iso={self.iso!r})'
@staticmethod
@lru_cache(maxsize=2048)
def _parse_dt(
dt_string: Optional[str],
format_hint: Optional[Iterable[int]] = None
) -> Dict[str, Any]:
"""
Input a datetime text string of several formats and convert to
a naive or timezone-aware epoch timestamp in UTC.
Parameters:
dt_string: (string) a string representation of a date-time
in several supported formats
format_hint: (list | tuple) a list of format ID int's that
should be tried first. This can increase
performance since the function will not need to
try many incorrect formats before finding the
correct one.
Returns:
Dictionary of the following format:
{
# for debugging purposes. None if conversion fails
"format": int,
# timestamp based on locally configured timezone.
# None if conversion fails.
"timestamp_naive": int,
# aware timestamp only if UTC timezone detected.
# None if conversion fails.
"timestamp_utc": int
# ISO string. None if conversion fails.
"iso": str
}
The `format` integer denotes which date_time format
conversion succeeded.
The `timestamp_naive` integer is the converted date-time
string to a naive epoch timestamp.
The `timestamp_utc` integer is the converted date-time
string to an aware epoch timestamp in the UTC timezone. If
an aware conversion cannot be performed (e.g. the UTC
timezone is not found in the date-time string), then this
field will be None.
The `iso` string will only have timezone information if the
UTC timezone is detected in `dt_string`.
If the conversion completely fails, all fields will be None.
"""
formats: tuple[TimeStampFormatType, ...] = (
{'id': 1000, 'format': '%a %b %d %H:%M:%S %Y', 'locale': None}, # manual C locale format conversion: Tue Mar 23 16:12:11 2021 or Tue Mar 23 16:12:11 IST 2021
{'id': 1100, 'format': '%a %b %d %H:%M:%S %Y %z', 'locale': None}, # git date output: Thu Mar 5 09:17:40 2020 -0800
{'id': 1300, 'format': '%Y-%m-%dT%H:%M:%S.%f%Z', 'locale': None}, # ISO Format with UTC (found in syslog 5424): 2003-10-11T22:14:15.003Z
{'id': 1310, 'format': '%Y-%m-%dT%H:%M:%S.%f', 'locale': None}, # ISO Format without TZ (found in syslog 5424): 2003-10-11T22:14:15.003
{'id': 1400, 'format': '%b %d %Y %H:%M:%S.%f UTC', 'locale': None}, # CEF Format with UTC: Nov 08 2022 12:30:00.111 UTC
{'id': 1410, 'format': '%b %d %Y %H:%M:%S.%f', 'locale': None}, # CEF Format without TZ: Nov 08 2022 12:30:00.111
{'id': 1420, 'format': '%b %d %Y %H:%M:%S UTC', 'locale': None}, # CEF Format with UTC without microseconds: Nov 08 2022 12:30:00 UTC
{'id': 1430, 'format': '%b %d %Y %H:%M:%S', 'locale': None}, # CEF Format without TZ or microseconds: Nov 08 2022 12:30:00
{'id': 1500, 'format': '%Y-%m-%d %H:%M', 'locale': None}, # en_US.UTF-8 local format (found in who cli output): 2021-03-23 00:14
{'id': 1600, 'format': '%m/%d/%Y %I:%M %p', 'locale': None}, # Windows english format (found in dir cli output): 12/07/2019 02:09 AM
{'id': 1700, 'format': '%m/%d/%Y, %I:%M:%S %p', 'locale': None}, # Windows english format wint non-UTC tz (found in systeminfo cli output): 3/22/2021, 1:15:51 PM (UTC-0600)
{'id': 1705, 'format': '%m/%d/%Y, %I:%M:%S %p %Z', 'locale': None}, # Windows english format with UTC tz (found in systeminfo cli output): 3/22/2021, 1:15:51 PM (UTC)
{'id': 1710, 'format': '%m/%d/%Y, %I:%M:%S %p UTC%z', 'locale': None}, # Windows english format with UTC tz (found in systeminfo cli output): 3/22/2021, 1:15:51 PM (UTC+0000)
{'id': 1720, 'format': '%A, %B %d, %Y %I:%M:%S %p', 'locale': None}, # ipconfig cli output format: Thursday, June 22, 2023 10:39:04 AM
{'id': 1750, 'format': '%Y/%m/%d-%H:%M:%S.%f', 'locale': None}, # Google Big Table format with no timezone: 1970/01/01-01:00:00.000000
{'id': 1755, 'format': '%Y/%m/%d-%H:%M:%S.%f%z', 'locale': None}, # Google Big Table format with timezone: 1970/01/01-01:00:00.000000+00:00
{'id': 1760, 'format': '%Y-%m-%d %H:%M:%S%z', 'locale': None}, # certbot format with timezone: 2023-06-12 01:35:30+00:00
{'id': 1800, 'format': '%d/%b/%Y:%H:%M:%S %z', 'locale': None}, # Common Log Format: 10/Oct/2000:13:55:36 -0700
{'id': 2000, 'format': '%a %d %b %Y %I:%M:%S %p %Z', 'locale': None}, # en_US.UTF-8 local format (found in upower cli output): Tue 23 Mar 2021 04:12:11 PM UTC
{'id': 3000, 'format': '%a %d %b %Y %I:%M:%S %p', 'locale': None}, # en_US.UTF-8 local format with non-UTC tz (found in upower cli output): Tue 23 Mar 2021 04:12:11 PM IST
{'id': 3100, 'format': '%a %d %b %Y %I:%M:%S %p %z', 'locale': None}, # pacman format - append 00 to end to make it work: # Sat 11 May 2024 06:14:19 AM +0800
{'id': 3500, 'format': '%a, %d %b %Y %H:%M:%S %Z', 'locale': None}, # HTTP header time format (always GMT so assume UTC): Wed, 31 Jan 2024 00:39:28 GMT
{'id': 4000, 'format': '%A %d %B %Y %I:%M:%S %p %Z', 'locale': None}, # European-style local format (found in upower cli output): Tuesday 01 October 2019 12:50:41 PM UTC
{'id': 5000, 'format': '%A %d %B %Y %I:%M:%S %p', 'locale': None}, # European-style local format with non-UTC tz (found in upower cli output): Tuesday 01 October 2019 12:50:41 PM IST
{'id': 6000, 'format': '%a %b %d %I:%M:%S %p %Z %Y', 'locale': None}, # en_US.UTF-8 format (found in date cli): Wed Mar 24 06:16:19 PM UTC 2021
{'id': 7000, 'format': '%a %b %d %H:%M:%S %Z %Y', 'locale': None}, # C locale format (found in date cli): Wed Mar 24 11:11:30 UTC 2021
{'id': 7100, 'format': '%b %d %H:%M:%S %Y', 'locale': None}, # C locale format (found in stat cli output - osx): # Mar 29 11:49:05 2021
{'id': 7200, 'format': '%Y-%m-%d %H:%M:%S.%f %z', 'locale': None}, # C locale format (found in stat cli output - linux): 2019-08-13 18:13:43.555604315 -0400
{'id': 7250, 'format': '%Y-%m-%d %H:%M:%S', 'locale': None}, # C locale format with non-UTC tz (found in modified vmstat cli output): # 2021-09-16 20:32:28 PDT
{'id': 7255, 'format': '%Y-%m-%d %H:%M:%S %Z', 'locale': None}, # C locale format (found in modified vmstat cli output): # 2021-09-16 20:32:28 UTC
{'id': 7300, 'format': '%a %Y-%m-%d %H:%M:%S %Z', 'locale': None}, # C locale format (found in timedatectl cli output): # Wed 2020-03-11 00:53:21 UTC
# attempt locale changes last
{'id': 8000, 'format': '%a %d %b %Y %H:%M:%S %Z', 'locale': ''}, # current locale format (found in upower cli output): # mar. 23 mars 2021 23:12:11 UTC
{'id': 8100, 'format': '%a %d %b %Y %H:%M:%S', 'locale': ''}, # current locale format with non-UTC tz (found in upower cli output): # mar. 23 mars 2021 19:12:11 EDT
{'id': 8200, 'format': '%A %d %B %Y, %H:%M:%S UTC%z', 'locale': ''}, # fr_FR.utf8 locale format (found in date cli output): vendredi 26 mars 2021, 13:26:46 (UTC+0000)
{'id': 8300, 'format': '%A %d %B %Y, %H:%M:%S', 'locale': ''}, # fr_FR.utf8 locale format with non-UTC tz (found in date cli output): vendredi 26 mars 2021, 13:26:46 (UTC-0400)
{'id': 9000, 'format': '%c', 'locale': ''} # locally configured locale format conversion: Could be anything :) this is a last-gasp attempt
)
# from https://www.timeanddate.com/time/zones/
# only removed UTC & GMT timezones and added known non-UTC offsets
tz_abbr: set[str] = {
'A', 'ACDT', 'ACST', 'ACT', 'ACWST', 'ADT', 'AEDT', 'AEST', 'AET', 'AFT', 'AKDT',
'AKST', 'ALMT', 'AMST', 'AMT', 'ANAST', 'ANAT', 'AQTT', 'ART', 'AST', 'AT', 'AWDT',
'AWST', 'AZOST', 'AZOT', 'AZST', 'AZT', 'AoE', 'B', 'BNT', 'BOT', 'BRST', 'BRT', 'BST',
'BTT', 'C', 'CAST', 'CAT', 'CCT', 'CDT', 'CEST', 'CET', 'CHADT', 'CHAST', 'CHOST',
'CHOT', 'CHUT', 'CIDST', 'CIST', 'CKT', 'CLST', 'CLT', 'COT', 'CST', 'CT', 'CVT', 'CXT',
'ChST', 'D', 'DAVT', 'DDUT', 'E', 'EASST', 'EAST', 'EAT', 'ECT', 'EDT', 'EEST', 'EET',
'EGST', 'EGT', 'EST', 'ET', 'F', 'FET', 'FJST', 'FJT', 'FKST', 'FKT', 'FNT', 'G',
'GALT', 'GAMT', 'GET', 'GFT', 'GILT', 'GST', 'GYT', 'H', 'HDT', 'HKT', 'HOVST',
'HOVT', 'HST', 'I', 'ICT', 'IDT', 'IOT', 'IRDT', 'IRKST', 'IRKT', 'IRST', 'IST', 'JST',
'K', 'KGT', 'KOST', 'KRAST', 'KRAT', 'KST', 'KUYT', 'L', 'LHDT', 'LHST', 'LINT', 'M',
'MAGST', 'MAGT', 'MART', 'MAWT', 'MDT', 'MHT', 'MMT', 'MSD', 'MSK', 'MST', 'MT', 'MUT',
'MVT', 'MYT', 'N', 'NCT', 'NDT', 'NFDT', 'NFT', 'NOVST', 'NOVT', 'NPT', 'NRT', 'NST',
'NUT', 'NZDT', 'NZST', 'O', 'OMSST', 'OMST', 'ORAT', 'P', 'PDT', 'PET', 'PETST', 'PETT',
'PGT', 'PHOT', 'PHT', 'PKT', 'PMDT', 'PMST', 'PONT', 'PST', 'PT', 'PWT', 'PYST', 'PYT',
'Q', 'QYZT', 'R', 'RET', 'ROTT', 'S', 'SAKT', 'SAMT', 'SAST', 'SBT', 'SCT', 'SGT',
'SRET', 'SRT', 'SST', 'SYOT', 'T', 'TAHT', 'TFT', 'TJT', 'TKT', 'TLT', 'TMT', 'TOST',
'TOT', 'TRT', 'TVT', 'U', 'ULAST', 'ULAT', 'UYST', 'UYT', 'UZT', 'V', 'VET', 'VLAST',
'VLAT', 'VOST', 'VUT', 'W', 'WAKT', 'WARST', 'WAST', 'WAT', 'WEST', 'WET', 'WFT',
'WGST', 'WGT', 'WIB', 'WIT', 'WITA', 'WST', 'WT', 'X', 'Y', 'YAKST', 'YAKT', 'YAPT',
'YEKST', 'YEKT', 'UTC-1200', 'UTC-1100', 'UTC-1000', 'UTC-0930', 'UTC-0900',
'UTC-0800', 'UTC-0700', 'UTC-0600', 'UTC-0500', 'UTC-0400', 'UTC-0300', 'UTC-0230',
'UTC-0200', 'UTC-0100', 'UTC+0100', 'UTC+0200', 'UTC+0300', 'UTC+0400', 'UTC+0430',
'UTC+0500', 'UTC+0530', 'UTC+0545', 'UTC+0600', 'UTC+0630', 'UTC+0700', 'UTC+0800',
'UTC+0845', 'UTC+0900', 'UTC+1000', 'UTC+1030', 'UTC+1100', 'UTC+1200', 'UTC+1300',
'UTC+1345', 'UTC+1400'
}
offset_suffixes: tuple[str, ...] = (
'-12:00', '-11:00', '-10:00', '-09:30', '-09:00',
'-08:00', '-07:00', '-06:00', '-05:00', '-04:00', '-03:00', '-02:30',
'-02:00', '-01:00', '+01:00', '+02:00', '+03:00', '+04:00', '+04:30',
'+05:00', '+05:30', '+05:45', '+06:00', '+06:30', '+07:00', '+08:00',
'+08:45', '+09:00', '+10:00', '+10:30', '+11:00', '+12:00', '+13:00',
'+13:45', '+14:00'
)
data: str = dt_string or ''
normalized_datetime: str = ''
utc_tz: bool = False
dt: Optional[datetime] = None
dt_utc: Optional[datetime] = None
timestamp_naive: Optional[int] = None
timestamp_utc: Optional[int] = None
iso_string: Optional[str] = None
timestamp_obj: Dict[str, Any] = {
'format': None,
'timestamp_naive': None,
'timestamp_utc': None,
'iso': None
}
# convert format_hint to a tuple so it is hashable (for lru_cache)
if not format_hint:
format_hint = tuple()
else:
format_hint = tuple(format_hint)
# sometimes UTC is referenced as 'Coordinated Universal Time'. Convert to 'UTC'
data = data.replace('Coordinated Universal Time', 'UTC')
# UTC can also be indicated with 'Z' for Zulu time (ISO-8601). Convert to 'UTC'
data = data.replace('Z', 'UTC')
# GMT and UTC are practically equivalent. Convert to 'UTC'
data = data.replace('GMT', 'UTC')
if 'UTC' in data:
utc_tz = True
if 'UTC+' in data or 'UTC-' in data:
utc_tz = bool('UTC+0000' in data or 'UTC-0000' in data)
elif '+0000' in data \
or '-0000' in data \
or '+00:00' in data \
or '-00:00' in data:
utc_tz = True
data = data.replace('+00:00', '+0000') # fix for python 3.6
# normalize the timezone by taking out any timezone reference, except UTC
cleandata = data.replace('(', '').replace(')', '')
normalized_datetime_list: List[str] = []
for term in cleandata.split():
if term not in tz_abbr:
normalized_datetime_list.append(term)
normalized_datetime = ' '.join(normalized_datetime_list)
# remove non UTC offset suffixes at the end of the string
for suffix in offset_suffixes:
if normalized_datetime.endswith(suffix):
normalized_datetime = normalized_datetime[0:-len(suffix)]
break
# normalize further by converting any greater-than 6-digit subsecond to 6-digits
p = re.compile(r'(\W\d\d:\d\d:\d\d\.\d{6})\d+\W')
normalized_datetime = p.sub(r'\g<1> ', normalized_datetime)
# try format hints first, then fall back to brute-force method
hint_obj_list: List[TimeStampFormatType] = []
for fmt_id in format_hint:
for fmt in formats:
if fmt_id == fmt['id']:
hint_obj_list.append(fmt)
remaining_formats = [fmt for fmt in formats if not fmt['id'] in format_hint]
optimized_formats = hint_obj_list + remaining_formats
for fmt in optimized_formats:
try:
locale.setlocale(locale.LC_TIME, fmt['locale'])
dt = datetime.strptime(normalized_datetime, fmt['format'])
timestamp_obj['format'] = fmt['id']
timestamp_naive = int(dt.replace(tzinfo=None).timestamp())
iso_string = dt.replace(tzinfo=None).isoformat()
locale.setlocale(locale.LC_TIME, None)
break
except Exception:
locale.setlocale(locale.LC_TIME, None)
continue
if dt and utc_tz:
dt_utc = dt.replace(tzinfo=timezone.utc)
timestamp_utc = int(dt_utc.timestamp())
iso_string = dt_utc.isoformat()
if timestamp_naive:
timestamp_obj['timestamp_naive'] = timestamp_naive
timestamp_obj['timestamp_utc'] = timestamp_utc
timestamp_obj['iso'] = iso_string
return timestamp_obj