1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-19 00:17:51 +02:00

add jc.parse() high-level API

This commit is contained in:
Kelly Brazil
2022-01-18 13:10:14 -08:00
parent 830674cc6f
commit 0a9dde58c5
3 changed files with 228 additions and 134 deletions

View File

@ -55,8 +55,22 @@ CLI Example:
Module Example:
>>> import jc.parsers.dig
>>> import subprocess
>>> import jc
>>>
>>> cmd_output = subprocess.check_output(['dig', 'example.com'], text=True)
>>> data = jc.parse('dig', cmd_output)
>>>
>>> data
[{'id': 64612, 'opcode': 'QUERY', 'status': 'NOERROR', 'flags': ['qr', 'rd', 'ra'], 'query_num': 1, 'answer_num':
1, 'authority_num': 0, 'additional_num': 1, 'opt_pseudosection': {'edns': {'version': 0, 'flags': [], 'udp':
4096}}, 'question': {'name': 'example.com.', 'class': 'IN', 'type': 'A'}, 'answer': [{'name': 'example.com.',
'class': 'IN', 'type': 'A', 'ttl': 29658, 'data': '93.184.216.34'}], 'query_time': 52, 'server':
'2600:1700:bab0:d40::1#53(2600:1700:bab0:d40::1)', 'when': 'Fri Apr 16 16:13:00 PDT 2021', 'rcvd': 56,
'when_epoch': 1618614780, 'when_epoch_utc': None}]
Alternatively, you can bypass the high-level API and call the parser modules directly:
>>> import subprocess
>>> import jc.parsers.dig
>>>
@ -71,6 +85,8 @@ Module Example:
'2600:1700:bab0:d40::1#53(2600:1700:bab0:d40::1)', 'when': 'Fri Apr 16 16:13:00 PDT 2021', 'rcvd': 56,
'when_epoch': 1618614780, 'when_epoch_utc': None}]
"""
from .lib import __version__, parse, parser_mod_list, plugin_parser_mod_list
name = 'jc'
__version__ = '1.17.7'
# cleanup
del lib
del appdirs

148
jc/cli.py
View File

@ -4,19 +4,16 @@ JC cli module
import sys
import os
import os.path
import re
import importlib
import textwrap
import signal
import shlex
import subprocess
import json
import jc
from jc import appdirs
import jc.utils
import jc.tracebackplus
from jc.exceptions import LibraryNotInstalled, ParseError
from .lib import __version__, parsers, local_parsers
from . import utils
from . import tracebackplus
from .exceptions import LibraryNotInstalled, ParseError
# make pygments import optional
try:
@ -31,8 +28,11 @@ except Exception:
PYGMENTS_INSTALLED = False
JC_ERROR_EXIT = 100
class info():
version = jc.__version__
version = __version__
description = 'JSON CLI output utility'
author = 'Kelly Brazil'
author_email = 'kellyjonbrazil@gmail.com'
@ -41,114 +41,6 @@ class info():
license = 'MIT License'
__version__ = info.version
parsers = [
'acpi',
'airport',
'airport-s',
'arp',
'blkid',
'cksum',
'crontab',
'crontab-u',
'csv',
'csv-s',
'date',
'df',
'dig',
'dir',
'dmidecode',
'dpkg-l',
'du',
'env',
'file',
'finger',
'free',
'fstab',
'group',
'gshadow',
'hash',
'hashsum',
'hciconfig',
'history',
'hosts',
'id',
'ifconfig',
'ini',
'iostat',
'iostat-s',
'iptables',
'iw-scan',
'jar-manifest',
'jobs',
'kv',
'last',
'ls',
'ls-s',
'lsblk',
'lsmod',
'lsof',
'lsusb',
'mount',
'netstat',
'ntpq',
'passwd',
'ping',
'ping-s',
'pip-list',
'pip-show',
'ps',
'route',
'rpm-qi',
'sfdisk',
'shadow',
'ss',
'stat',
'stat-s',
'sysctl',
'systemctl',
'systemctl-lj',
'systemctl-ls',
'systemctl-luf',
'systeminfo',
'time',
'timedatectl',
'tracepath',
'traceroute',
'ufw',
'ufw-appinfo',
'uname',
'upower',
'uptime',
'vmstat',
'vmstat-s',
'w',
'wc',
'who',
'xml',
'yaml',
'zipinfo'
]
JC_ERROR_EXIT = 100
# List of custom or override parsers.
# Allow any <user_data_dir>/jc/jcparsers/*.py
local_parsers = []
data_dir = appdirs.user_data_dir('jc', 'jc')
local_parsers_dir = os.path.join(data_dir, 'jcparsers')
if os.path.isdir(local_parsers_dir):
sys.path.append(data_dir)
for name in os.listdir(local_parsers_dir):
if re.match(r'\w+\.py$', name) and os.path.isfile(os.path.join(local_parsers_dir, name)):
plugin_name = name[0:-3]
local_parsers.append(plugin_name)
if plugin_name not in parsers:
parsers.append(plugin_name)
# We only support 2.3.0+, pygments changed color names in 2.4.0.
# startswith is sufficient and avoids potential exceptions from split and int.
if PYGMENTS_INSTALLED:
@ -226,7 +118,7 @@ def set_env_colors(env_colors=None):
# if there is an issue with the env variable, just set all colors to default and move on
if input_error:
jc.utils.warning_message(['Could not parse JC_COLORS environment variable'])
utils.warning_message(['Could not parse JC_COLORS environment variable'])
color_list = ['default', 'default', 'default', 'default']
# Try the color set in the JC_COLORS env variable first. If it is set to default, then fall back to default colors
@ -543,7 +435,7 @@ def main():
version_info = 'v' in options
if verbose_debug:
jc.tracebackplus.enable(context=11)
tracebackplus.enable(context=11)
if not PYGMENTS_INSTALLED:
mono = True
@ -582,25 +474,25 @@ def main():
if debug:
raise
jc.utils.error_message([f'"{run_command_str}" command could not be found. For details use the -d or -dd option.'])
utils.error_message([f'"{run_command_str}" command could not be found. For details use the -d or -dd option.'])
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
except OSError:
if debug:
raise
jc.utils.error_message([f'"{run_command_str}" command could not be run due to too many open files. For details use the -d or -dd option.'])
utils.error_message([f'"{run_command_str}" command could not be run due to too many open files. For details use the -d or -dd option.'])
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
except Exception:
if debug:
raise
jc.utils.error_message([f'"{run_command_str}" command could not be run. For details use the -d or -dd option.'])
utils.error_message([f'"{run_command_str}" command could not be run. For details use the -d or -dd option.'])
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
elif run_command is not None:
jc.utils.error_message([f'"{run_command_str}" cannot be used with Magic syntax. Use "jc -h" for help.'])
utils.error_message([f'"{run_command_str}" cannot be used with Magic syntax. Use "jc -h" for help.'])
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
# find the correct parser
@ -619,16 +511,16 @@ def main():
break
if not found:
jc.utils.error_message(['Missing or incorrect arguments. Use "jc -h" for help.'])
utils.error_message(['Missing or incorrect arguments. Use "jc -h" for help.'])
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
# check for input errors (pipe vs magic)
if not sys.stdin.isatty() and magic_stdout:
jc.utils.error_message(['Piped data and Magic syntax used simultaneously. Use "jc -h" for help.'])
utils.error_message(['Piped data and Magic syntax used simultaneously. Use "jc -h" for help.'])
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
elif sys.stdin.isatty() and magic_stdout is None:
jc.utils.error_message(['Missing piped data. Use "jc -h" for help.'])
utils.error_message(['Missing piped data. Use "jc -h" for help.'])
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
# parse and print to stdout
@ -665,7 +557,7 @@ def main():
if debug:
raise
jc.utils.error_message([f'Parser issue with {parser_name}:',
utils.error_message([f'Parser issue with {parser_name}:',
f'{e.__class__.__name__}: {e}',
'For details use the -d or -dd option. Use "jc -h" for help.'])
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
@ -674,7 +566,7 @@ def main():
if debug:
raise
jc.utils.error_message(['There was an issue generating the JSON output.',
utils.error_message(['There was an issue generating the JSON output.',
'For details use the -d or -dd option.'])
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
@ -686,7 +578,7 @@ def main():
if getattr(parser.info, 'streaming', None):
streaming_msg = 'Use the -qq option to ignore streaming parser errors.'
jc.utils.error_message([
utils.error_message([
f'{parser_name} parser could not parse the input data. Did you use the correct parser?',
f'{streaming_msg}',
'For details use the -d or -dd option. Use "jc -h" for help.'

186
jc/lib.py Normal file
View File

@ -0,0 +1,186 @@
"""jc - JSON CLI output utility
JC lib module
"""
import sys
import os
import re
import importlib
from . import appdirs
__version__ = '1.18.0'
parsers = [
'acpi',
'airport',
'airport-s',
'arp',
'blkid',
'cksum',
'crontab',
'crontab-u',
'csv',
'csv-s',
'date',
'df',
'dig',
'dir',
'dmidecode',
'dpkg-l',
'du',
'env',
'file',
'finger',
'free',
'fstab',
'group',
'gshadow',
'hash',
'hashsum',
'hciconfig',
'history',
'hosts',
'id',
'ifconfig',
'ini',
'iostat',
'iostat-s',
'iptables',
'iw-scan',
'jar-manifest',
'jobs',
'kv',
'last',
'ls',
'ls-s',
'lsblk',
'lsmod',
'lsof',
'lsusb',
'mount',
'netstat',
'ntpq',
'passwd',
'ping',
'ping-s',
'pip-list',
'pip-show',
'ps',
'route',
'rpm-qi',
'sfdisk',
'shadow',
'ss',
'stat',
'stat-s',
'sysctl',
'systemctl',
'systemctl-lj',
'systemctl-ls',
'systemctl-luf',
'systeminfo',
'time',
'timedatectl',
'tracepath',
'traceroute',
'ufw',
'ufw-appinfo',
'uname',
'upower',
'uptime',
'vmstat',
'vmstat-s',
'w',
'wc',
'who',
'xml',
'yaml',
'zipinfo'
]
# List of custom or override parsers.
# Allow any <user_data_dir>/jc/jcparsers/*.py
local_parsers = []
data_dir = appdirs.user_data_dir('jc', 'jc')
local_parsers_dir = os.path.join(data_dir, 'jcparsers')
if os.path.isdir(local_parsers_dir):
sys.path.append(data_dir)
for name in os.listdir(local_parsers_dir):
if re.match(r'\w+\.py$', name) and os.path.isfile(os.path.join(local_parsers_dir, name)):
plugin_name = name[0:-3]
local_parsers.append(plugin_name)
if plugin_name not in parsers:
parsers.append(plugin_name)
def _cliname_to_modname(parser_cli_name):
"""Return real module name (dashes converted to underscores)"""
return parser_cli_name.replace('-', '_')
def _modname_to_cliname(parser_mod_name):
"""Return module's cli name (underscores converted to dashes)"""
return parser_mod_name.replace('_', '-')
def parse(parser_mod_name, data, quiet=False, raw=False, ignore_exceptions=None, **kwargs):
"""
Parse the string data using the supplied parser module.
This function provides a high-level API to simplify parser use. This function will
call built-in parsers and custom plugin parsers.
Example:
>>> import jc
>>> jc.parse('date', 'Tue Jan 18 10:23:07 PST 2022')
{'year': 2022, 'month': 'Jan', 'month_num': 1, 'day'...}
To get a list of available parser module names, use `parser_mod_list()`
or `plugin_parser_mod_list()`.
You can also use the lower-level parser modules directly:
>>> import jc.parsers.date
>>> jc.parsers.date.parse('Tue Jan 18 10:23:07 PST 2022')
Though, accessing plugin parsers directly is a bit more involved:
>>> import os
>>> import sys
>>> import jc.appdirs
>>> data_dir = jc.appdirs.user_data_dir('jc', 'jc')
>>> local_parsers_dir = os.path.join(data_dir, 'jcparsers')
>>> sys.path.append(local_parsers_dir)
>>> import my_custom_parser
>>> my_custom_parser.parse('command_data')
Parameters:
parser_mod_name: (string) Name of the parser module
data: (string or iterator) Data to parse (string for normal parsers,
iterator of strings for streaming parsers)
raw: (boolean) output preprocessed JSON if True
quiet: (boolean) suppress warning messages if True
ignore_exceptions: (boolean) ignore parsing exceptions if True (streaming
parsers only)
Returns:
Standard Parsers: Dictionary or List of Dictionaries
Streaming Parsers: Generator Object
"""
parser_cli_name = _modname_to_cliname(parser_mod_name)
modpath = ('jcparsers.' if parser_cli_name in local_parsers else 'jc.parsers.')
jc_parser = importlib.import_module(f'{modpath}{parser_mod_name}')
if ignore_exceptions is not None:
return jc_parser.parse(data, quiet=quiet, raw=raw, ignore_exceptions=ignore_exceptions, **kwargs)
else:
return jc_parser.parse(data, quiet=quiet, raw=raw, **kwargs)
def parser_mod_list():
"""list of all available parser module names."""
return [_cliname_to_modname(p) for p in parsers]
def plugin_parser_mod_list():
"""list of plugin parser module names."""
return [_cliname_to_modname(p) for p in local_parsers]