mirror of
https://github.com/kellyjonbrazil/jc.git
synced 2025-06-19 00:17:51 +02:00
refactor cli
This commit is contained in:
767
jc/cli.old.py
Normal file
767
jc/cli.old.py
Normal file
@ -0,0 +1,767 @@
|
||||
"""jc - JSON Convert
|
||||
JC cli module
|
||||
"""
|
||||
|
||||
import io
|
||||
import sys
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
import textwrap
|
||||
import signal
|
||||
import shlex
|
||||
import subprocess
|
||||
from .lib import (__version__, parser_info, all_parser_info, parsers,
|
||||
_get_parser, _parser_is_streaming, parser_mod_list,
|
||||
standard_parser_mod_list, plugin_parser_mod_list,
|
||||
streaming_parser_mod_list)
|
||||
from . import utils
|
||||
from .cli_data import long_options_map, new_pygments_colors, old_pygments_colors
|
||||
from .shell_completions import bash_completion, zsh_completion
|
||||
from . import tracebackplus
|
||||
from .exceptions import LibraryNotInstalled, ParseError
|
||||
|
||||
# make pygments import optional
|
||||
try:
|
||||
import pygments
|
||||
from pygments import highlight
|
||||
from pygments.style import Style
|
||||
from pygments.token import (Name, Number, String, Keyword)
|
||||
from pygments.lexers.data import JsonLexer, YamlLexer
|
||||
from pygments.formatters import Terminal256Formatter
|
||||
PYGMENTS_INSTALLED = True
|
||||
except Exception:
|
||||
PYGMENTS_INSTALLED = False
|
||||
|
||||
JC_ERROR_EXIT = 100
|
||||
|
||||
|
||||
class info():
|
||||
version = __version__
|
||||
description = 'JSON Convert'
|
||||
author = 'Kelly Brazil'
|
||||
author_email = 'kellyjonbrazil@gmail.com'
|
||||
website = 'https://github.com/kellyjonbrazil/jc'
|
||||
copyright = '© 2019-2022 Kelly Brazil'
|
||||
license = 'MIT License'
|
||||
|
||||
|
||||
# We only support 2.3.0+, pygments changed color names in 2.4.0.
|
||||
# startswith is sufficient and avoids potential exceptions from split and int.
|
||||
if PYGMENTS_INSTALLED:
|
||||
if pygments.__version__.startswith('2.3.'):
|
||||
PYGMENT_COLOR = old_pygments_colors
|
||||
else:
|
||||
PYGMENT_COLOR = new_pygments_colors
|
||||
|
||||
|
||||
def set_env_colors(env_colors=None):
|
||||
"""
|
||||
Return a dictionary to be used in Pygments custom style class.
|
||||
|
||||
Grab custom colors from JC_COLORS environment variable. JC_COLORS env
|
||||
variable takes 4 comma separated string values and should be in the
|
||||
format of:
|
||||
|
||||
JC_COLORS=<keyname_color>,<keyword_color>,<number_color>,<string_color>
|
||||
|
||||
Where colors are: black, red, green, yellow, blue, magenta, cyan, gray,
|
||||
brightblack, brightred, brightgreen, brightyellow,
|
||||
brightblue, brightmagenta, brightcyan, white, default
|
||||
|
||||
Default colors:
|
||||
|
||||
JC_COLORS=blue,brightblack,magenta,green
|
||||
or
|
||||
JC_COLORS=default,default,default,default
|
||||
"""
|
||||
input_error = False
|
||||
|
||||
if env_colors:
|
||||
color_list = env_colors.split(',')
|
||||
else:
|
||||
color_list = ['default', 'default', 'default', 'default']
|
||||
|
||||
if len(color_list) != 4:
|
||||
input_error = True
|
||||
|
||||
for color in color_list:
|
||||
if color != 'default' and color not in PYGMENT_COLOR:
|
||||
input_error = True
|
||||
|
||||
# if there is an issue with the env variable, just set all colors to default and move on
|
||||
if input_error:
|
||||
utils.warning_message(['Could not parse JC_COLORS environment variable'])
|
||||
color_list = ['default', 'default', 'default', 'default']
|
||||
|
||||
# Try the color set in the JC_COLORS env variable first. If it is set to default, then fall back to default colors
|
||||
return {
|
||||
Name.Tag: f'bold {PYGMENT_COLOR[color_list[0]]}' if color_list[0] != 'default' else f"bold {PYGMENT_COLOR['blue']}", # key names
|
||||
Keyword: PYGMENT_COLOR[color_list[1]] if color_list[1] != 'default' else PYGMENT_COLOR['brightblack'], # true, false, null
|
||||
Number: PYGMENT_COLOR[color_list[2]] if color_list[2] != 'default' else PYGMENT_COLOR['magenta'], # numbers
|
||||
String: PYGMENT_COLOR[color_list[3]] if color_list[3] != 'default' else PYGMENT_COLOR['green'] # strings
|
||||
}
|
||||
|
||||
|
||||
def piped_output(force_color):
|
||||
"""
|
||||
Return False if `STDOUT` is a TTY. True if output is being piped to
|
||||
another program and foce_color is True. This allows forcing of ANSI
|
||||
color codes even when using pipes.
|
||||
"""
|
||||
return not sys.stdout.isatty() and not force_color
|
||||
|
||||
|
||||
def ctrlc(signum, frame):
|
||||
"""Exit with error on SIGINT"""
|
||||
sys.exit(JC_ERROR_EXIT)
|
||||
|
||||
|
||||
def parser_shortname(parser_arg):
|
||||
"""Return short name of the parser with dashes and no -- prefix"""
|
||||
return parser_arg[2:]
|
||||
|
||||
|
||||
def parsers_text(indent=0, pad=0, show_hidden=False):
|
||||
"""Return the argument and description information from each parser"""
|
||||
ptext = ''
|
||||
padding_char = ' '
|
||||
for p in all_parser_info(show_hidden=show_hidden):
|
||||
parser_arg = p.get('argument', 'UNKNOWN')
|
||||
padding = pad - len(parser_arg)
|
||||
parser_desc = p.get('description', 'No description available.')
|
||||
indent_text = padding_char * indent
|
||||
padding_text = padding_char * padding
|
||||
ptext += indent_text + parser_arg + padding_text + parser_desc + '\n'
|
||||
|
||||
return ptext
|
||||
|
||||
|
||||
def options_text(indent=0, pad=0):
|
||||
"""Return the argument and description information from each option"""
|
||||
otext = ''
|
||||
padding_char = ' '
|
||||
for option in long_options_map:
|
||||
o_short = '-' + long_options_map[option][0]
|
||||
o_desc = long_options_map[option][1]
|
||||
o_combined = o_short + ', ' + option
|
||||
padding = pad - len(o_combined)
|
||||
indent_text = padding_char * indent
|
||||
padding_text = padding_char * padding
|
||||
otext += indent_text + o_combined + padding_text + o_desc + '\n'
|
||||
|
||||
return otext
|
||||
|
||||
|
||||
def about_jc():
|
||||
"""Return jc info and the contents of each parser.info as a dictionary"""
|
||||
return {
|
||||
'name': 'jc',
|
||||
'version': info.version,
|
||||
'description': info.description,
|
||||
'author': info.author,
|
||||
'author_email': info.author_email,
|
||||
'website': info.website,
|
||||
'copyright': info.copyright,
|
||||
'license': info.license,
|
||||
'python_version': '.'.join((str(sys.version_info.major), str(sys.version_info.minor), str(sys.version_info.micro))),
|
||||
'python_path': sys.executable,
|
||||
'parser_count': len(parser_mod_list()),
|
||||
'standard_parser_count': len(standard_parser_mod_list()),
|
||||
'streaming_parser_count': len(streaming_parser_mod_list()),
|
||||
'plugin_parser_count': len(plugin_parser_mod_list()),
|
||||
'parsers': all_parser_info(show_hidden=True)
|
||||
}
|
||||
|
||||
|
||||
def helptext(show_hidden=False):
|
||||
"""Return the help text with the list of parsers"""
|
||||
parsers_string = parsers_text(indent=4, pad=20, show_hidden=show_hidden)
|
||||
options_string = options_text(indent=4, pad=20)
|
||||
|
||||
helptext_string = f'''\
|
||||
jc converts the output of many commands, file-types, and strings to JSON or YAML
|
||||
|
||||
Usage:
|
||||
|
||||
Standard syntax:
|
||||
|
||||
COMMAND | jc [OPTIONS] PARSER
|
||||
|
||||
cat FILE | jc [OPTIONS] PARSER
|
||||
|
||||
echo STRING | jc [OPTIONS] PARSER
|
||||
|
||||
Magic syntax:
|
||||
|
||||
jc [OPTIONS] COMMAND
|
||||
|
||||
jc [OPTIONS] /proc/<path-to-procfile>
|
||||
|
||||
Parsers:
|
||||
{parsers_string}
|
||||
Options:
|
||||
{options_string}
|
||||
Examples:
|
||||
Standard Syntax:
|
||||
$ dig www.google.com | jc --pretty --dig
|
||||
$ cat /proc/meminfo | jc --pretty --proc
|
||||
|
||||
Magic Syntax:
|
||||
$ jc --pretty dig www.google.com
|
||||
$ jc --pretty /proc/meminfo
|
||||
|
||||
Parser Documentation:
|
||||
$ jc --help --dig
|
||||
|
||||
Show Hidden Parsers:
|
||||
$ jc -hh
|
||||
'''
|
||||
|
||||
return helptext_string
|
||||
|
||||
|
||||
def help_doc(options, show_hidden=False):
|
||||
"""
|
||||
Returns the parser documentation if a parser is found in the arguments,
|
||||
otherwise the general help text is returned.
|
||||
"""
|
||||
for arg in options:
|
||||
parser_name = parser_shortname(arg)
|
||||
|
||||
if parser_name in parsers:
|
||||
p_info = parser_info(arg, documentation=True)
|
||||
compatible = ', '.join(p_info.get('compatible', ['unknown']))
|
||||
documentation = p_info.get('documentation', 'No documentation available.')
|
||||
version = p_info.get('version', 'unknown')
|
||||
author = p_info.get('author', 'unknown')
|
||||
author_email = p_info.get('author_email', 'unknown')
|
||||
doc_text = \
|
||||
f'{documentation}\n'\
|
||||
f'Compatibility: {compatible}\n\n'\
|
||||
f'Version {version} by {author} ({author_email})\n'
|
||||
|
||||
utils._safe_pager(doc_text)
|
||||
return
|
||||
|
||||
utils._safe_print(helptext(show_hidden=show_hidden))
|
||||
return
|
||||
|
||||
|
||||
def versiontext():
|
||||
"""Return the version text"""
|
||||
py_ver = '.'.join((str(sys.version_info.major), str(sys.version_info.minor), str(sys.version_info.micro)))
|
||||
versiontext_string = f'''\
|
||||
jc version: {info.version}
|
||||
python interpreter version: {py_ver}
|
||||
python path: {sys.executable}
|
||||
|
||||
{info.website}
|
||||
{info.copyright}
|
||||
'''
|
||||
return textwrap.dedent(versiontext_string)
|
||||
|
||||
|
||||
def yaml_out(data, pretty=False, env_colors=None, mono=False, piped_out=False, ascii_only=False):
|
||||
"""
|
||||
Return a YAML formatted string. String may include color codes. If the
|
||||
YAML library is not installed, output will fall back to JSON with a
|
||||
warning message to STDERR"""
|
||||
# make ruamel.yaml import optional
|
||||
try:
|
||||
from ruamel.yaml import YAML, representer
|
||||
YAML_INSTALLED = True
|
||||
except Exception:
|
||||
YAML_INSTALLED = False
|
||||
|
||||
if YAML_INSTALLED:
|
||||
y_string_buf = io.BytesIO()
|
||||
|
||||
# monkey patch to disable plugins since we don't use them and in
|
||||
# ruamel.yaml versions prior to 0.17.0 the use of __file__ in the
|
||||
# plugin code is incompatible with the pyoxidizer packager
|
||||
YAML.official_plug_ins = lambda a: []
|
||||
|
||||
# monkey patch to disable aliases
|
||||
representer.RoundTripRepresenter.ignore_aliases = lambda x, y: True
|
||||
|
||||
yaml = YAML()
|
||||
yaml.default_flow_style = False
|
||||
yaml.explicit_start = True
|
||||
yaml.allow_unicode = not ascii_only
|
||||
yaml.encoding = 'utf-8'
|
||||
yaml.dump(data, y_string_buf)
|
||||
y_string = y_string_buf.getvalue().decode('utf-8')[:-1]
|
||||
|
||||
if not mono and not piped_out:
|
||||
# set colors
|
||||
class JcStyle(Style):
|
||||
styles = set_env_colors(env_colors)
|
||||
|
||||
return str(highlight(y_string, YamlLexer(), Terminal256Formatter(style=JcStyle))[0:-1])
|
||||
|
||||
return y_string
|
||||
|
||||
utils.warning_message(['YAML Library not installed. Reverting to JSON output.'])
|
||||
return json_out(data, pretty=pretty, env_colors=env_colors, mono=mono, piped_out=piped_out, ascii_only=ascii_only)
|
||||
|
||||
|
||||
def json_out(data, pretty=False, env_colors=None, mono=False, piped_out=False, ascii_only=False):
|
||||
"""
|
||||
Return a JSON formatted string. String may include color codes or be
|
||||
pretty printed.
|
||||
"""
|
||||
import json
|
||||
|
||||
separators = (',', ':')
|
||||
indent = None
|
||||
|
||||
if pretty:
|
||||
separators = None
|
||||
indent = 2
|
||||
|
||||
j_string = json.dumps(data, indent=indent, separators=separators, ensure_ascii=ascii_only)
|
||||
|
||||
if not mono and not piped_out:
|
||||
# set colors
|
||||
class JcStyle(Style):
|
||||
styles = set_env_colors(env_colors)
|
||||
|
||||
return str(highlight(j_string, JsonLexer(), Terminal256Formatter(style=JcStyle))[0:-1])
|
||||
|
||||
return j_string
|
||||
|
||||
|
||||
def safe_print_out(list_or_dict, pretty=None, env_colors=None, mono=None,
|
||||
piped_out=None, flush=None, yaml=None):
|
||||
"""Safely prints JSON or YAML output in both UTF-8 and ASCII systems"""
|
||||
if yaml:
|
||||
try:
|
||||
print(yaml_out(list_or_dict,
|
||||
pretty=pretty,
|
||||
env_colors=env_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_out),
|
||||
flush=flush)
|
||||
except UnicodeEncodeError:
|
||||
print(yaml_out(list_or_dict,
|
||||
pretty=pretty,
|
||||
env_colors=env_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_out,
|
||||
ascii_only=True),
|
||||
flush=flush)
|
||||
|
||||
else:
|
||||
try:
|
||||
print(json_out(list_or_dict,
|
||||
pretty=pretty,
|
||||
env_colors=env_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_out),
|
||||
flush=flush)
|
||||
except UnicodeEncodeError:
|
||||
print(json_out(list_or_dict,
|
||||
pretty=pretty,
|
||||
env_colors=env_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_out,
|
||||
ascii_only=True),
|
||||
flush=flush)
|
||||
|
||||
|
||||
def magic_parser(args):
|
||||
"""
|
||||
Parse command arguments for magic syntax: jc -p ls -al
|
||||
|
||||
Return a tuple:
|
||||
valid_command (bool) is this a valid cmd? (exists in magic dict)
|
||||
run_command (list) list of the user's cmd to run. None if no cmd.
|
||||
jc_parser (str) parser to use for this user's cmd.
|
||||
jc_options (list) list of jc options
|
||||
"""
|
||||
# bail immediately if there are no args or a parser is defined
|
||||
if len(args) <= 1 or (args[1].startswith('--') and args[1] not in long_options_map):
|
||||
return False, None, None, []
|
||||
|
||||
args_given = args[1:]
|
||||
options = []
|
||||
|
||||
# find the options
|
||||
for arg in list(args_given):
|
||||
# long option found - populate option list
|
||||
if arg in long_options_map:
|
||||
options.extend(long_options_map[arg][0])
|
||||
args_given.pop(0)
|
||||
continue
|
||||
|
||||
# parser found - use standard syntax
|
||||
if arg.startswith('--'):
|
||||
return False, None, None, []
|
||||
|
||||
# option found - populate option list
|
||||
if arg.startswith('-'):
|
||||
options.extend(args_given.pop(0)[1:])
|
||||
|
||||
# command found if iterator didn't already stop - stop iterating
|
||||
else:
|
||||
break
|
||||
|
||||
# if -h, -a, or -v found in options, then bail out
|
||||
if 'h' in options or 'a' in options or 'v' in options:
|
||||
return False, None, None, []
|
||||
|
||||
# all options popped and no command found - for case like 'jc -x'
|
||||
if len(args_given) == 0:
|
||||
return False, None, None, []
|
||||
|
||||
# create a dictionary of magic_commands to their respective parsers.
|
||||
magic_dict = {}
|
||||
for entry in all_parser_info():
|
||||
magic_dict.update({mc: entry['argument'] for mc in entry.get('magic_commands', [])})
|
||||
|
||||
# find the command and parser
|
||||
one_word_command = args_given[0]
|
||||
two_word_command = ' '.join(args_given[0:2])
|
||||
|
||||
# try to get a parser for two_word_command, otherwise get one for one_word_command
|
||||
found_parser = magic_dict.get(two_word_command, magic_dict.get(one_word_command))
|
||||
|
||||
return (
|
||||
bool(found_parser), # was a suitable parser found?
|
||||
args_given, # run_command
|
||||
found_parser, # the parser selected
|
||||
options # jc options to preserve
|
||||
)
|
||||
|
||||
|
||||
def open_text_file(path_string):
|
||||
with open(path_string, 'r') as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def run_user_command(command):
|
||||
"""
|
||||
Use subprocess to run the user's command. Returns the STDOUT, STDERR,
|
||||
and the Exit Code as a tuple.
|
||||
"""
|
||||
proc = subprocess.Popen(command,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
close_fds=False, # Allows inheriting file descriptors;
|
||||
universal_newlines=True, # useful for process substitution
|
||||
encoding='UTF-8')
|
||||
stdout, stderr = proc.communicate()
|
||||
|
||||
return (
|
||||
stdout or '\n',
|
||||
stderr,
|
||||
proc.returncode
|
||||
)
|
||||
|
||||
|
||||
def combined_exit_code(program_exit=0, jc_exit=0):
|
||||
exit_code = program_exit + jc_exit
|
||||
exit_code = min(exit_code, 255)
|
||||
return exit_code
|
||||
|
||||
|
||||
def add_metadata_to(list_or_dict,
|
||||
runtime=None,
|
||||
run_command=None,
|
||||
magic_exit_code=None,
|
||||
parser_name=None):
|
||||
"""
|
||||
This function mutates a list or dict in place. If the _jc_meta field
|
||||
does not already exist, it will be created with the metadata fields. If
|
||||
the _jc_meta field already exists, the metadata fields will be added to
|
||||
the existing object.
|
||||
|
||||
In the case of an empty list (no data), a dictionary with a _jc_meta
|
||||
object will be added to the list. This way you always get metadata,
|
||||
even if there are no results.
|
||||
"""
|
||||
run_timestamp = runtime.timestamp()
|
||||
|
||||
meta_obj = {
|
||||
'parser': parser_name,
|
||||
'timestamp': run_timestamp
|
||||
}
|
||||
|
||||
if run_command:
|
||||
meta_obj['magic_command'] = run_command
|
||||
meta_obj['magic_command_exit'] = magic_exit_code
|
||||
|
||||
if isinstance(list_or_dict, dict):
|
||||
if '_jc_meta' not in list_or_dict:
|
||||
list_or_dict['_jc_meta'] = {}
|
||||
|
||||
list_or_dict['_jc_meta'].update(meta_obj)
|
||||
|
||||
elif isinstance(list_or_dict, list):
|
||||
if not list_or_dict:
|
||||
list_or_dict.append({})
|
||||
|
||||
for item in list_or_dict:
|
||||
if '_jc_meta' not in item:
|
||||
item['_jc_meta'] = {}
|
||||
|
||||
item['_jc_meta'].update(meta_obj)
|
||||
|
||||
else:
|
||||
utils.error_message(['Parser returned an unsupported object type.'])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
|
||||
|
||||
def main():
|
||||
# break on ctrl-c keyboard interrupt
|
||||
signal.signal(signal.SIGINT, ctrlc)
|
||||
|
||||
# break on pipe error. need try/except for windows compatibility
|
||||
try:
|
||||
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
# enable colors for Windows cmd.exe terminal
|
||||
if sys.platform.startswith('win32'):
|
||||
os.system('')
|
||||
|
||||
# parse magic syntax first: e.g. jc -p ls -al
|
||||
magic_options = []
|
||||
valid_command, run_command, magic_found_parser, magic_options = magic_parser(sys.argv)
|
||||
|
||||
# set colors
|
||||
jc_colors = os.getenv('JC_COLORS')
|
||||
|
||||
# set options
|
||||
options = []
|
||||
options.extend(magic_options)
|
||||
|
||||
# find options if magic_parser did not find a command
|
||||
if not valid_command:
|
||||
for opt in sys.argv:
|
||||
if opt in long_options_map:
|
||||
options.extend(long_options_map[opt][0])
|
||||
|
||||
if opt.startswith('-') and not opt.startswith('--'):
|
||||
options.extend(opt[1:])
|
||||
|
||||
about = 'a' in options
|
||||
debug = 'd' in options
|
||||
verbose_debug = options.count('d') > 1
|
||||
force_color = 'C' in options
|
||||
mono = ('m' in options or bool(os.getenv('NO_COLOR'))) and not force_color
|
||||
help_me = 'h' in options
|
||||
verbose_help = options.count('h') > 1
|
||||
pretty = 'p' in options
|
||||
quiet = 'q' in options
|
||||
ignore_exceptions = options.count('q') > 1
|
||||
raw = 'r' in options
|
||||
meta_out = 'M' in options
|
||||
unbuffer = 'u' in options
|
||||
version_info = 'v' in options
|
||||
yaml_out = 'y' in options
|
||||
bash_comp = 'B' in options
|
||||
zsh_comp = 'Z' in options
|
||||
|
||||
if verbose_debug:
|
||||
tracebackplus.enable(context=11)
|
||||
|
||||
if not PYGMENTS_INSTALLED:
|
||||
mono = True
|
||||
|
||||
if about:
|
||||
safe_print_out(about_jc(),
|
||||
pretty=pretty,
|
||||
env_colors=jc_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_output(force_color),
|
||||
yaml=yaml_out)
|
||||
sys.exit(0)
|
||||
|
||||
if help_me:
|
||||
help_doc(sys.argv, show_hidden=verbose_help)
|
||||
sys.exit(0)
|
||||
|
||||
if version_info:
|
||||
utils._safe_print(versiontext())
|
||||
sys.exit(0)
|
||||
|
||||
if bash_comp:
|
||||
utils._safe_print(bash_completion())
|
||||
sys.exit(0)
|
||||
|
||||
if zsh_comp:
|
||||
utils._safe_print(zsh_completion())
|
||||
sys.exit(0)
|
||||
|
||||
# if magic syntax used, try to run the command and error if it's not found, etc.
|
||||
magic_stdout, magic_stderr, magic_exit_code = None, None, 0
|
||||
run_command_str = ''
|
||||
if run_command:
|
||||
try:
|
||||
run_command_str = shlex.join(run_command) # python 3.8+
|
||||
except AttributeError:
|
||||
run_command_str = ' '.join(run_command) # older python versions
|
||||
|
||||
if run_command_str.startswith('/proc'):
|
||||
try:
|
||||
magic_found_parser = 'proc'
|
||||
magic_stdout = open_text_file(run_command_str)
|
||||
|
||||
except OSError as e:
|
||||
if debug:
|
||||
raise
|
||||
|
||||
error_msg = os.strerror(e.errno)
|
||||
utils.error_message([
|
||||
f'"{run_command_str}" file could not be opened: {error_msg}.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
|
||||
except Exception:
|
||||
if debug:
|
||||
raise
|
||||
|
||||
utils.error_message([
|
||||
f'"{run_command_str}" file could not be opened. For details use the -d or -dd option.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
|
||||
elif valid_command:
|
||||
try:
|
||||
magic_stdout, magic_stderr, magic_exit_code = run_user_command(run_command)
|
||||
if magic_stderr:
|
||||
utils._safe_print(magic_stderr[:-1], file=sys.stderr)
|
||||
|
||||
except OSError as e:
|
||||
if debug:
|
||||
raise
|
||||
|
||||
error_msg = os.strerror(e.errno)
|
||||
utils.error_message([
|
||||
f'"{run_command_str}" command could not be run: {error_msg}.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
|
||||
except Exception:
|
||||
if debug:
|
||||
raise
|
||||
|
||||
utils.error_message([
|
||||
f'"{run_command_str}" command could not be run. For details use the -d or -dd option.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
|
||||
elif run_command is not None:
|
||||
utils.error_message([f'"{run_command_str}" cannot be used with Magic syntax. Use "jc -h" for help.'])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
|
||||
# find the correct parser
|
||||
if magic_found_parser:
|
||||
parser = _get_parser(magic_found_parser)
|
||||
parser_name = parser_shortname(magic_found_parser)
|
||||
|
||||
else:
|
||||
found = False
|
||||
for arg in sys.argv:
|
||||
parser_name = parser_shortname(arg)
|
||||
|
||||
if parser_name in parsers:
|
||||
parser = _get_parser(arg)
|
||||
found = True
|
||||
break
|
||||
|
||||
if not found:
|
||||
utils.error_message(['Missing or incorrect arguments. Use "jc -h" for help.'])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
|
||||
if sys.stdin.isatty() and magic_stdout is None:
|
||||
utils.error_message(['Missing piped data. Use "jc -h" for help.'])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
|
||||
# parse and print to stdout
|
||||
try:
|
||||
# differentiate between regular and streaming parsers
|
||||
|
||||
# streaming (only supports UTF-8 string data for now)
|
||||
if _parser_is_streaming(parser):
|
||||
result = parser.parse(sys.stdin,
|
||||
raw=raw,
|
||||
quiet=quiet,
|
||||
ignore_exceptions=ignore_exceptions)
|
||||
|
||||
for line in result:
|
||||
if meta_out:
|
||||
run_dt_utc = datetime.now(timezone.utc)
|
||||
add_metadata_to(line, run_dt_utc, run_command, magic_exit_code, parser_name)
|
||||
|
||||
safe_print_out(line,
|
||||
pretty=pretty,
|
||||
env_colors=jc_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_output(force_color),
|
||||
flush=unbuffer,
|
||||
yaml=yaml_out)
|
||||
|
||||
sys.exit(combined_exit_code(magic_exit_code, 0))
|
||||
|
||||
# regular (supports binary and UTF-8 string data)
|
||||
else:
|
||||
data = magic_stdout or sys.stdin.buffer.read()
|
||||
|
||||
# convert to UTF-8, if possible. Otherwise, leave as bytes
|
||||
try:
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
pass
|
||||
|
||||
result = parser.parse(data,
|
||||
raw=raw,
|
||||
quiet=quiet)
|
||||
|
||||
if meta_out:
|
||||
run_dt_utc = datetime.now(timezone.utc)
|
||||
add_metadata_to(result, run_dt_utc, run_command, magic_exit_code, parser_name)
|
||||
|
||||
safe_print_out(result,
|
||||
pretty=pretty,
|
||||
env_colors=jc_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_output(force_color),
|
||||
flush=unbuffer,
|
||||
yaml=yaml_out)
|
||||
|
||||
sys.exit(combined_exit_code(magic_exit_code, 0))
|
||||
|
||||
except (ParseError, LibraryNotInstalled) as e:
|
||||
if debug:
|
||||
raise
|
||||
|
||||
utils.error_message([
|
||||
f'Parser issue with {parser_name}:', f'{e.__class__.__name__}: {e}',
|
||||
'If this is the correct parser, try setting the locale to C (LC_ALL=C).',
|
||||
f'For details use the -d or -dd option. Use "jc -h --{parser_name}" for help.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
|
||||
except Exception:
|
||||
if debug:
|
||||
raise
|
||||
|
||||
streaming_msg = ''
|
||||
if _parser_is_streaming(parser):
|
||||
streaming_msg = 'Use the -qq option to ignore streaming parser errors.'
|
||||
|
||||
utils.error_message([
|
||||
f'{parser_name} parser could not parse the input data.',
|
||||
f'{streaming_msg}',
|
||||
'If this is the correct parser, try setting the locale to C (LC_ALL=C).',
|
||||
f'For details use the -d or -dd option. Use "jc -h --{parser_name}" for help.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
540
jc/cli.py
540
jc/cli.py
@ -10,6 +10,7 @@ import textwrap
|
||||
import signal
|
||||
import shlex
|
||||
import subprocess
|
||||
from typing import List, Dict
|
||||
from .lib import (__version__, parser_info, all_parser_info, parsers,
|
||||
_get_parser, _parser_is_streaming, parser_mod_list,
|
||||
standard_parser_mod_list, plugin_parser_mod_list,
|
||||
@ -32,8 +33,6 @@ try:
|
||||
except Exception:
|
||||
PYGMENTS_INSTALLED = False
|
||||
|
||||
JC_ERROR_EXIT = 100
|
||||
|
||||
|
||||
class info():
|
||||
version = __version__
|
||||
@ -54,7 +53,67 @@ if PYGMENTS_INSTALLED:
|
||||
PYGMENT_COLOR = new_pygments_colors
|
||||
|
||||
|
||||
def set_env_colors(env_colors=None):
|
||||
class JcCli():
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.data_in = None
|
||||
self.data_out = None
|
||||
self.current_out_line = None
|
||||
self.options: List[str] = []
|
||||
self.args: List[str] = []
|
||||
self.parser_arg = None
|
||||
self.parser_module = None
|
||||
self.parser_name = None
|
||||
self.indent = 0
|
||||
self.pad = 0
|
||||
self.env_colors: Dict = {}
|
||||
self.documentation = False
|
||||
self.show_hidden = False
|
||||
self.piped_out = False
|
||||
self.ascii_only = False
|
||||
self.flush = False
|
||||
self.json_separators = (',', ':')
|
||||
self.json_indent = None
|
||||
self.path_string = None
|
||||
self.run_command_str = ''
|
||||
self.command = None
|
||||
self.program_exit = 0
|
||||
self.jc_exit = 0
|
||||
self.JC_ERROR_EXIT = 100
|
||||
self.exit_code = 0
|
||||
self.runtime: datetime = datetime.now()
|
||||
self.run_timestamp = None
|
||||
|
||||
# cli options
|
||||
self.about = False
|
||||
self.debug = False
|
||||
self.verbose_debug = False
|
||||
self.force_color = False
|
||||
self.mono = False
|
||||
self.help_me = False
|
||||
self.verbose_help = False
|
||||
self.pretty = False
|
||||
self.quiet = False
|
||||
self.ignore_exceptions = False
|
||||
self.raw = False
|
||||
self.meta_out = False
|
||||
self.unbuffer = False
|
||||
self.version_info = False
|
||||
self.yaml_output = False
|
||||
self.bash_comp = False
|
||||
self.zsh_comp = False
|
||||
|
||||
# Magic options
|
||||
self.valid_command = False
|
||||
self.run_command = None
|
||||
self.magic_found_parser = None
|
||||
self.magic_options: List[str] = []
|
||||
self.magic_stdout = None,
|
||||
self.magic_stderr = None,
|
||||
self.magic_returncode = None
|
||||
|
||||
|
||||
def set_env_colors(self):
|
||||
"""
|
||||
Return a dictionary to be used in Pygments custom style class.
|
||||
|
||||
@ -76,8 +135,8 @@ def set_env_colors(env_colors=None):
|
||||
"""
|
||||
input_error = False
|
||||
|
||||
if env_colors:
|
||||
color_list = env_colors.split(',')
|
||||
if self.env_colors:
|
||||
color_list = self.env_colors.split(',')
|
||||
else:
|
||||
color_list = ['default', 'default', 'default', 'default']
|
||||
|
||||
@ -94,49 +153,40 @@ def set_env_colors(env_colors=None):
|
||||
color_list = ['default', 'default', 'default', 'default']
|
||||
|
||||
# Try the color set in the JC_COLORS env variable first. If it is set to default, then fall back to default colors
|
||||
return {
|
||||
self.set_env_colors = {
|
||||
Name.Tag: f'bold {PYGMENT_COLOR[color_list[0]]}' if color_list[0] != 'default' else f"bold {PYGMENT_COLOR['blue']}", # key names
|
||||
Keyword: PYGMENT_COLOR[color_list[1]] if color_list[1] != 'default' else PYGMENT_COLOR['brightblack'], # true, false, null
|
||||
Number: PYGMENT_COLOR[color_list[2]] if color_list[2] != 'default' else PYGMENT_COLOR['magenta'], # numbers
|
||||
String: PYGMENT_COLOR[color_list[3]] if color_list[3] != 'default' else PYGMENT_COLOR['green'] # strings
|
||||
}
|
||||
|
||||
|
||||
def piped_output(force_color):
|
||||
def piped_output(self):
|
||||
"""
|
||||
Return False if `STDOUT` is a TTY. True if output is being piped to
|
||||
another program and foce_color is True. This allows forcing of ANSI
|
||||
color codes even when using pipes.
|
||||
"""
|
||||
return not sys.stdout.isatty() and not force_color
|
||||
return not sys.stdout.isatty() and not self.force_color
|
||||
|
||||
|
||||
def ctrlc(signum, frame):
|
||||
"""Exit with error on SIGINT"""
|
||||
sys.exit(JC_ERROR_EXIT)
|
||||
|
||||
|
||||
def parser_shortname(parser_arg):
|
||||
def parser_shortname(self):
|
||||
"""Return short name of the parser with dashes and no -- prefix"""
|
||||
return parser_arg[2:]
|
||||
return self.parser_arg[2:]
|
||||
|
||||
|
||||
def parsers_text(indent=0, pad=0, show_hidden=False):
|
||||
def parsers_text(self):
|
||||
"""Return the argument and description information from each parser"""
|
||||
ptext = ''
|
||||
padding_char = ' '
|
||||
for p in all_parser_info(show_hidden=show_hidden):
|
||||
for p in all_parser_info(show_hidden=self.show_hidden):
|
||||
parser_arg = p.get('argument', 'UNKNOWN')
|
||||
padding = pad - len(parser_arg)
|
||||
padding = self.pad - len(parser_arg)
|
||||
parser_desc = p.get('description', 'No description available.')
|
||||
indent_text = padding_char * indent
|
||||
indent_text = padding_char * self.indent
|
||||
padding_text = padding_char * padding
|
||||
ptext += indent_text + parser_arg + padding_text + parser_desc + '\n'
|
||||
|
||||
return ptext
|
||||
|
||||
|
||||
def options_text(indent=0, pad=0):
|
||||
def options_text(self):
|
||||
"""Return the argument and description information from each option"""
|
||||
otext = ''
|
||||
padding_char = ' '
|
||||
@ -144,15 +194,15 @@ def options_text(indent=0, pad=0):
|
||||
o_short = '-' + long_options_map[option][0]
|
||||
o_desc = long_options_map[option][1]
|
||||
o_combined = o_short + ', ' + option
|
||||
padding = pad - len(o_combined)
|
||||
indent_text = padding_char * indent
|
||||
padding = self.pad - len(o_combined)
|
||||
indent_text = padding_char * self.indent
|
||||
padding_text = padding_char * padding
|
||||
otext += indent_text + o_combined + padding_text + o_desc + '\n'
|
||||
|
||||
return otext
|
||||
|
||||
|
||||
def about_jc():
|
||||
@staticmethod
|
||||
def about_jc():
|
||||
"""Return jc info and the contents of each parser.info as a dictionary"""
|
||||
return {
|
||||
'name': 'jc',
|
||||
@ -172,11 +222,12 @@ def about_jc():
|
||||
'parsers': all_parser_info(show_hidden=True)
|
||||
}
|
||||
|
||||
|
||||
def helptext(show_hidden=False):
|
||||
def helptext(self):
|
||||
"""Return the help text with the list of parsers"""
|
||||
parsers_string = parsers_text(indent=4, pad=20, show_hidden=show_hidden)
|
||||
options_string = options_text(indent=4, pad=20)
|
||||
self.indent = 4
|
||||
self.pad = 20
|
||||
parsers_string = self.parsers_text()
|
||||
options_string = self.options_text()
|
||||
|
||||
helptext_string = f'''\
|
||||
jc converts the output of many commands, file-types, and strings to JSON or YAML
|
||||
@ -219,35 +270,36 @@ Examples:
|
||||
|
||||
return helptext_string
|
||||
|
||||
|
||||
def help_doc(options, show_hidden=False):
|
||||
def help_doc(self):
|
||||
"""
|
||||
Returns the parser documentation if a parser is found in the arguments,
|
||||
otherwise the general help text is returned.
|
||||
"""
|
||||
for arg in options:
|
||||
parser_name = parser_shortname(arg)
|
||||
for arg in self.options:
|
||||
self.parser_arg = arg
|
||||
parser_name = self.parser_shortname()
|
||||
|
||||
if parser_name in parsers:
|
||||
p_info = parser_info(arg, documentation=True)
|
||||
self.documentation = True
|
||||
p_info = parser_info()
|
||||
compatible = ', '.join(p_info.get('compatible', ['unknown']))
|
||||
documentation = p_info.get('documentation', 'No documentation available.')
|
||||
docs = p_info.get('documentation', 'No documentation available.')
|
||||
version = p_info.get('version', 'unknown')
|
||||
author = p_info.get('author', 'unknown')
|
||||
author_email = p_info.get('author_email', 'unknown')
|
||||
doc_text = \
|
||||
f'{documentation}\n'\
|
||||
f'{docs}\n'\
|
||||
f'Compatibility: {compatible}\n\n'\
|
||||
f'Version {version} by {author} ({author_email})\n'
|
||||
|
||||
utils._safe_pager(doc_text)
|
||||
return
|
||||
|
||||
utils._safe_print(helptext(show_hidden=show_hidden))
|
||||
utils._safe_print(self.helptext())
|
||||
return
|
||||
|
||||
|
||||
def versiontext():
|
||||
@staticmethod
|
||||
def versiontext():
|
||||
"""Return the version text"""
|
||||
py_ver = '.'.join((str(sys.version_info.major), str(sys.version_info.minor), str(sys.version_info.micro)))
|
||||
versiontext_string = f'''\
|
||||
@ -261,7 +313,7 @@ def versiontext():
|
||||
return textwrap.dedent(versiontext_string)
|
||||
|
||||
|
||||
def yaml_out(data, pretty=False, env_colors=None, mono=False, piped_out=False, ascii_only=False):
|
||||
def yaml_out(self):
|
||||
"""
|
||||
Return a YAML formatted string. String may include color codes. If the
|
||||
YAML library is not installed, output will fall back to JSON with a
|
||||
@ -287,89 +339,65 @@ def yaml_out(data, pretty=False, env_colors=None, mono=False, piped_out=False, a
|
||||
yaml = YAML()
|
||||
yaml.default_flow_style = False
|
||||
yaml.explicit_start = True
|
||||
yaml.allow_unicode = not ascii_only
|
||||
yaml.allow_unicode = not self.ascii_only
|
||||
yaml.encoding = 'utf-8'
|
||||
yaml.dump(data, y_string_buf)
|
||||
yaml.dump(self.data_out, y_string_buf)
|
||||
y_string = y_string_buf.getvalue().decode('utf-8')[:-1]
|
||||
|
||||
if not mono and not piped_out:
|
||||
if not self.mono and not self.piped_out:
|
||||
# set colors
|
||||
class JcStyle(Style):
|
||||
styles = set_env_colors(env_colors)
|
||||
styles = self.set_env_colors()
|
||||
|
||||
return str(highlight(y_string, YamlLexer(), Terminal256Formatter(style=JcStyle))[0:-1])
|
||||
|
||||
return y_string
|
||||
|
||||
utils.warning_message(['YAML Library not installed. Reverting to JSON output.'])
|
||||
return json_out(data, pretty=pretty, env_colors=env_colors, mono=mono, piped_out=piped_out, ascii_only=ascii_only)
|
||||
return self.json_out()
|
||||
|
||||
|
||||
def json_out(data, pretty=False, env_colors=None, mono=False, piped_out=False, ascii_only=False):
|
||||
def json_out(self):
|
||||
"""
|
||||
Return a JSON formatted string. String may include color codes or be
|
||||
pretty printed.
|
||||
"""
|
||||
import json
|
||||
|
||||
separators = (',', ':')
|
||||
indent = None
|
||||
if self.pretty:
|
||||
self.json_indent = 2
|
||||
self.json_separators = None
|
||||
|
||||
if pretty:
|
||||
separators = None
|
||||
indent = 2
|
||||
j_string = json.dumps(self.data_out,
|
||||
indent=self.json_indent,
|
||||
separators=self.json_separators,
|
||||
ensure_ascii=self.ascii_only)
|
||||
|
||||
j_string = json.dumps(data, indent=indent, separators=separators, ensure_ascii=ascii_only)
|
||||
|
||||
if not mono and not piped_out:
|
||||
if not self.mono and not self.piped_out:
|
||||
# set colors
|
||||
class JcStyle(Style):
|
||||
styles = set_env_colors(env_colors)
|
||||
styles = self.set_env_colors()
|
||||
|
||||
return str(highlight(j_string, JsonLexer(), Terminal256Formatter(style=JcStyle))[0:-1])
|
||||
|
||||
return j_string
|
||||
|
||||
|
||||
def safe_print_out(list_or_dict, pretty=None, env_colors=None, mono=None,
|
||||
piped_out=None, flush=None, yaml=None):
|
||||
def safe_print_out(self):
|
||||
"""Safely prints JSON or YAML output in both UTF-8 and ASCII systems"""
|
||||
if yaml:
|
||||
if self.yaml_output:
|
||||
try:
|
||||
print(yaml_out(list_or_dict,
|
||||
pretty=pretty,
|
||||
env_colors=env_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_out),
|
||||
flush=flush)
|
||||
print(self.yaml_out(), flush=self.flush)
|
||||
except UnicodeEncodeError:
|
||||
print(yaml_out(list_or_dict,
|
||||
pretty=pretty,
|
||||
env_colors=env_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_out,
|
||||
ascii_only=True),
|
||||
flush=flush)
|
||||
self.ascii_only = True
|
||||
print(self.yaml_out(), flush=self.flush)
|
||||
|
||||
else:
|
||||
try:
|
||||
print(json_out(list_or_dict,
|
||||
pretty=pretty,
|
||||
env_colors=env_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_out),
|
||||
flush=flush)
|
||||
print(self.json_out(), flush=self.flush)
|
||||
except UnicodeEncodeError:
|
||||
print(json_out(list_or_dict,
|
||||
pretty=pretty,
|
||||
env_colors=env_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_out,
|
||||
ascii_only=True),
|
||||
flush=flush)
|
||||
self.ascii_only = True
|
||||
print(self.json_out(), flush=self.flush)
|
||||
|
||||
|
||||
def magic_parser(args):
|
||||
def magic_parser(self):
|
||||
"""
|
||||
Parse command arguments for magic syntax: jc -p ls -al
|
||||
|
||||
@ -380,17 +408,16 @@ def magic_parser(args):
|
||||
jc_options (list) list of jc options
|
||||
"""
|
||||
# bail immediately if there are no args or a parser is defined
|
||||
if len(args) <= 1 or (args[1].startswith('--') and args[1] not in long_options_map):
|
||||
return False, None, None, []
|
||||
if len(self.args) <= 1 or (self.args[1].startswith('--') and self.args[1] not in long_options_map):
|
||||
pass
|
||||
|
||||
args_given = args[1:]
|
||||
options = []
|
||||
args_given = self.args[1:]
|
||||
|
||||
# find the options
|
||||
for arg in list(args_given):
|
||||
# long option found - populate option list
|
||||
if arg in long_options_map:
|
||||
options.extend(long_options_map[arg][0])
|
||||
self.magic_options.extend(long_options_map[arg][0])
|
||||
args_given.pop(0)
|
||||
continue
|
||||
|
||||
@ -400,14 +427,14 @@ def magic_parser(args):
|
||||
|
||||
# option found - populate option list
|
||||
if arg.startswith('-'):
|
||||
options.extend(args_given.pop(0)[1:])
|
||||
self.magic_options.extend(args_given.pop(0)[1:])
|
||||
|
||||
# command found if iterator didn't already stop - stop iterating
|
||||
else:
|
||||
break
|
||||
|
||||
# if -h, -a, or -v found in options, then bail out
|
||||
if 'h' in options or 'a' in options or 'v' in options:
|
||||
if 'h' in self.magic_options or 'a' in self.magic_options or 'v' in self.magic_options:
|
||||
return False, None, None, []
|
||||
|
||||
# all options popped and no command found - for case like 'jc -x'
|
||||
@ -424,52 +451,37 @@ def magic_parser(args):
|
||||
two_word_command = ' '.join(args_given[0:2])
|
||||
|
||||
# try to get a parser for two_word_command, otherwise get one for one_word_command
|
||||
found_parser = magic_dict.get(two_word_command, magic_dict.get(one_word_command))
|
||||
self.magic_found_parser = magic_dict.get(two_word_command, magic_dict.get(one_word_command))
|
||||
|
||||
return (
|
||||
bool(found_parser), # was a suitable parser found?
|
||||
args_given, # run_command
|
||||
found_parser, # the parser selected
|
||||
options # jc options to preserve
|
||||
)
|
||||
# set the instance variables
|
||||
self.valid_command = bool(self.magic_found_parser)
|
||||
self.run_command = args_given
|
||||
|
||||
|
||||
def open_text_file(path_string):
|
||||
with open(path_string, 'r') as f:
|
||||
def open_text_file(self):
|
||||
with open(self.path_string, 'r') as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def run_user_command(command):
|
||||
def run_user_command(self):
|
||||
"""
|
||||
Use subprocess to run the user's command. Returns the STDOUT, STDERR,
|
||||
and the Exit Code as a tuple.
|
||||
"""
|
||||
proc = subprocess.Popen(command,
|
||||
proc = subprocess.Popen(self.command,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
close_fds=False, # Allows inheriting file descriptors;
|
||||
universal_newlines=True, # useful for process substitution
|
||||
encoding='UTF-8')
|
||||
stdout, stderr = proc.communicate()
|
||||
self.magic_stdout, self.magic_stderr = proc.communicate()
|
||||
|
||||
return (
|
||||
stdout or '\n',
|
||||
stderr,
|
||||
proc.returncode
|
||||
)
|
||||
self.magic_stdout = self.magic_stdout or '\n'
|
||||
self.magic_returncode = proc.returncode
|
||||
|
||||
def combined_exit_code(self):
|
||||
self.exit_code = self.program_exit + self.jc_exit
|
||||
self.exit_code = min(self.exit_code, 255)
|
||||
|
||||
def combined_exit_code(program_exit=0, jc_exit=0):
|
||||
exit_code = program_exit + jc_exit
|
||||
exit_code = min(exit_code, 255)
|
||||
return exit_code
|
||||
|
||||
|
||||
def add_metadata_to(list_or_dict,
|
||||
runtime=None,
|
||||
run_command=None,
|
||||
magic_exit_code=None,
|
||||
parser_name=None):
|
||||
def add_metadata_to_output(self):
|
||||
"""
|
||||
This function mutates a list or dict in place. If the _jc_meta field
|
||||
does not already exist, it will be created with the metadata fields. If
|
||||
@ -480,28 +492,28 @@ def add_metadata_to(list_or_dict,
|
||||
object will be added to the list. This way you always get metadata,
|
||||
even if there are no results.
|
||||
"""
|
||||
run_timestamp = runtime.timestamp()
|
||||
self.run_timestamp = self.runtime.timestamp()
|
||||
|
||||
meta_obj = {
|
||||
'parser': parser_name,
|
||||
'timestamp': run_timestamp
|
||||
'parser': self.parser_name,
|
||||
'timestamp': self.run_timestamp
|
||||
}
|
||||
|
||||
if run_command:
|
||||
meta_obj['magic_command'] = run_command
|
||||
meta_obj['magic_command_exit'] = magic_exit_code
|
||||
if self.run_command:
|
||||
meta_obj['magic_command'] = self.run_command
|
||||
meta_obj['magic_command_exit'] = self.magic_exit_code
|
||||
|
||||
if isinstance(list_or_dict, dict):
|
||||
if '_jc_meta' not in list_or_dict:
|
||||
list_or_dict['_jc_meta'] = {}
|
||||
if isinstance(self.data_out, dict):
|
||||
if '_jc_meta' not in self.data_out:
|
||||
self.data_out['_jc_meta'] = {}
|
||||
|
||||
list_or_dict['_jc_meta'].update(meta_obj)
|
||||
self.data_out['_jc_meta'].update(meta_obj)
|
||||
|
||||
elif isinstance(list_or_dict, list):
|
||||
if not list_or_dict:
|
||||
list_or_dict.append({})
|
||||
elif isinstance(self.data_out, list):
|
||||
if not self.data_out:
|
||||
self.data_out.append({})
|
||||
|
||||
for item in list_or_dict:
|
||||
for item in self.data_out:
|
||||
if '_jc_meta' not in item:
|
||||
item['_jc_meta'] = {}
|
||||
|
||||
@ -509,12 +521,19 @@ def add_metadata_to(list_or_dict,
|
||||
|
||||
else:
|
||||
utils.error_message(['Parser returned an unsupported object type.'])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
self.jc_error = self.JC_ERROR_EXIT
|
||||
sys.exit(self.combined_exit_code())
|
||||
|
||||
def ctrlc(self, signum, frame):
|
||||
"""Exit with error on SIGINT"""
|
||||
sys.exit(self.JC_ERROR_EXIT)
|
||||
|
||||
|
||||
def main():
|
||||
cli = JcCli()
|
||||
|
||||
# break on ctrl-c keyboard interrupt
|
||||
signal.signal(signal.SIGINT, ctrlc)
|
||||
signal.signal(signal.SIGINT, cli.ctrlc)
|
||||
|
||||
# break on pipe error. need try/except for windows compatibility
|
||||
try:
|
||||
@ -527,240 +546,233 @@ def main():
|
||||
os.system('')
|
||||
|
||||
# parse magic syntax first: e.g. jc -p ls -al
|
||||
magic_options = []
|
||||
valid_command, run_command, magic_found_parser, magic_options = magic_parser(sys.argv)
|
||||
cli.args = sys.argv
|
||||
cli.magic_parser()
|
||||
|
||||
# set colors
|
||||
jc_colors = os.getenv('JC_COLORS')
|
||||
cli.env_colors = os.getenv('JC_COLORS')
|
||||
|
||||
# set options
|
||||
options = []
|
||||
options.extend(magic_options)
|
||||
cli.options.extend(cli.magic_options)
|
||||
|
||||
# find options if magic_parser did not find a command
|
||||
if not valid_command:
|
||||
for opt in sys.argv:
|
||||
if not cli.valid_command:
|
||||
for opt in cli.args:
|
||||
if opt in long_options_map:
|
||||
options.extend(long_options_map[opt][0])
|
||||
cli.options.extend(long_options_map[opt][0])
|
||||
|
||||
if opt.startswith('-') and not opt.startswith('--'):
|
||||
options.extend(opt[1:])
|
||||
cli.options.extend(opt[1:])
|
||||
|
||||
about = 'a' in options
|
||||
debug = 'd' in options
|
||||
verbose_debug = options.count('d') > 1
|
||||
force_color = 'C' in options
|
||||
mono = ('m' in options or bool(os.getenv('NO_COLOR'))) and not force_color
|
||||
help_me = 'h' in options
|
||||
verbose_help = options.count('h') > 1
|
||||
pretty = 'p' in options
|
||||
quiet = 'q' in options
|
||||
ignore_exceptions = options.count('q') > 1
|
||||
raw = 'r' in options
|
||||
meta_out = 'M' in options
|
||||
unbuffer = 'u' in options
|
||||
version_info = 'v' in options
|
||||
yaml_out = 'y' in options
|
||||
bash_comp = 'B' in options
|
||||
zsh_comp = 'Z' in options
|
||||
cli.about = 'a' in cli.options
|
||||
cli.debug = 'd' in cli.options
|
||||
cli.verbose_debug = cli.options.count('d') > 1
|
||||
cli.force_color = 'C' in cli.options
|
||||
cli.mono = ('m' in cli.options or bool(os.getenv('NO_COLOR'))) and not cli.force_color
|
||||
cli.help_me = 'h' in cli.options
|
||||
cli.verbose_help = cli.options.count('h') > 1
|
||||
cli.pretty = 'p' in cli.options
|
||||
cli.quiet = 'q' in cli.options
|
||||
cli.ignore_exceptions = cli.options.count('q') > 1
|
||||
cli.raw = 'r' in cli.options
|
||||
cli.meta_out = 'M' in cli.options
|
||||
cli.unbuffer = 'u' in cli.options
|
||||
cli.version_info = 'v' in cli.options
|
||||
cli.yaml_output = 'y' in cli.options
|
||||
cli.bash_comp = 'B' in cli.options
|
||||
cli.zsh_comp = 'Z' in cli.options
|
||||
|
||||
if verbose_debug:
|
||||
if cli.verbose_debug:
|
||||
tracebackplus.enable(context=11)
|
||||
|
||||
if not PYGMENTS_INSTALLED:
|
||||
mono = True
|
||||
cli.mono = True
|
||||
|
||||
if about:
|
||||
safe_print_out(about_jc(),
|
||||
pretty=pretty,
|
||||
env_colors=jc_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_output(force_color),
|
||||
yaml=yaml_out)
|
||||
if cli.about:
|
||||
cli.data_out = cli.about_jc()
|
||||
cli.safe_print_out()
|
||||
sys.exit(0)
|
||||
|
||||
if help_me:
|
||||
help_doc(sys.argv, show_hidden=verbose_help)
|
||||
if cli.help_me:
|
||||
cli.help_doc()
|
||||
sys.exit(0)
|
||||
|
||||
if version_info:
|
||||
utils._safe_print(versiontext())
|
||||
if cli.version_info:
|
||||
utils._safe_print(cli.versiontext())
|
||||
sys.exit(0)
|
||||
|
||||
if bash_comp:
|
||||
if cli.bash_comp:
|
||||
utils._safe_print(bash_completion())
|
||||
sys.exit(0)
|
||||
|
||||
if zsh_comp:
|
||||
if cli.zsh_comp:
|
||||
utils._safe_print(zsh_completion())
|
||||
sys.exit(0)
|
||||
|
||||
# if magic syntax used, try to run the command and error if it's not found, etc.
|
||||
magic_stdout, magic_stderr, magic_exit_code = None, None, 0
|
||||
run_command_str = ''
|
||||
if run_command:
|
||||
if cli.run_command:
|
||||
try:
|
||||
run_command_str = shlex.join(run_command) # python 3.8+
|
||||
cli.run_command_str = shlex.join(cli.run_command) # python 3.8+
|
||||
except AttributeError:
|
||||
run_command_str = ' '.join(run_command) # older python versions
|
||||
cli.run_command_str = ' '.join(cli.run_command) # older python versions
|
||||
|
||||
if run_command_str.startswith('/proc'):
|
||||
if cli.run_command_str.startswith('/proc'):
|
||||
try:
|
||||
magic_found_parser = 'proc'
|
||||
magic_stdout = open_text_file(run_command_str)
|
||||
cli.magic_found_parser = 'proc'
|
||||
cli.stdout = cli.open_text_file()
|
||||
|
||||
except OSError as e:
|
||||
if debug:
|
||||
if cli.debug:
|
||||
raise
|
||||
|
||||
error_msg = os.strerror(e.errno)
|
||||
utils.error_message([
|
||||
f'"{run_command_str}" file could not be opened: {error_msg}.'
|
||||
f'"{cli.run_command_str}" file could not be opened: {error_msg}.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
cli.jc_error = cli.JC_ERROR_EXIT
|
||||
sys.exit(cli.combined_exit_code())
|
||||
|
||||
except Exception:
|
||||
if debug:
|
||||
if cli.debug:
|
||||
raise
|
||||
|
||||
utils.error_message([
|
||||
f'"{run_command_str}" file could not be opened. For details use the -d or -dd option.'
|
||||
f'"{cli.run_command_str}" file could not be opened. For details use the -d or -dd option.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
cli.jc_error = cli.JC_ERROR_EXIT
|
||||
sys.exit(cli.combined_exit_code())
|
||||
|
||||
elif valid_command:
|
||||
elif cli.valid_command:
|
||||
try:
|
||||
magic_stdout, magic_stderr, magic_exit_code = run_user_command(run_command)
|
||||
if magic_stderr:
|
||||
utils._safe_print(magic_stderr[:-1], file=sys.stderr)
|
||||
cli.run_user_command()
|
||||
if cli.magic_stderr:
|
||||
utils._safe_print(cli.magic_stderr[:-1], file=sys.stderr)
|
||||
|
||||
except OSError as e:
|
||||
if debug:
|
||||
if cli.debug:
|
||||
raise
|
||||
|
||||
error_msg = os.strerror(e.errno)
|
||||
utils.error_message([
|
||||
f'"{run_command_str}" command could not be run: {error_msg}.'
|
||||
f'"{cli.run_command_str}" command could not be run: {error_msg}.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
cli.jc_error = cli.JC_ERROR_EXIT
|
||||
sys.exit(cli.combined_exit_code())
|
||||
|
||||
except Exception:
|
||||
if debug:
|
||||
if cli.debug:
|
||||
raise
|
||||
|
||||
utils.error_message([
|
||||
f'"{run_command_str}" command could not be run. For details use the -d or -dd option.'
|
||||
f'"{cli.run_command_str}" command could not be run. For details use the -d or -dd option.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
cli.jc_error = cli.JC_ERROR_EXIT
|
||||
sys.exit(cli.combined_exit_code())
|
||||
|
||||
elif run_command is not None:
|
||||
utils.error_message([f'"{run_command_str}" cannot be used with Magic syntax. Use "jc -h" for help.'])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
elif cli.run_command is not None:
|
||||
utils.error_message([f'"{cli.run_command_str}" cannot be used with Magic syntax. Use "jc -h" for help.'])
|
||||
sys.exit(cli.combined_exit_code())
|
||||
|
||||
# find the correct parser
|
||||
if magic_found_parser:
|
||||
parser = _get_parser(magic_found_parser)
|
||||
parser_name = parser_shortname(magic_found_parser)
|
||||
if cli.magic_found_parser:
|
||||
cli.parser_module = _get_parser(cli.magic_found_parser)
|
||||
cli.parser_name = cli.parser_shortname()
|
||||
|
||||
else:
|
||||
found = False
|
||||
for arg in sys.argv:
|
||||
parser_name = parser_shortname(arg)
|
||||
for arg in cli.args:
|
||||
cli.parser_arg = arg
|
||||
cli.parser_name = cli.parser_shortname()
|
||||
|
||||
if parser_name in parsers:
|
||||
parser = _get_parser(arg)
|
||||
if cli.parser_name in parsers:
|
||||
cli.parser_module = _get_parser(cli.parser_arg)
|
||||
found = True
|
||||
break
|
||||
|
||||
if not found:
|
||||
utils.error_message(['Missing or incorrect arguments. Use "jc -h" for help.'])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
cli.jc_error = cli.JC_ERROR_EXIT
|
||||
sys.exit(cli.combined_exit_code())
|
||||
|
||||
if sys.stdin.isatty() and magic_stdout is None:
|
||||
if sys.stdin.isatty() and cli.stdout is None:
|
||||
utils.error_message(['Missing piped data. Use "jc -h" for help.'])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
cli.jc_error = cli.JC_ERROR_EXIT
|
||||
sys.exit(cli.combined_exit_code())
|
||||
|
||||
# parse and print to stdout
|
||||
try:
|
||||
# differentiate between regular and streaming parsers
|
||||
|
||||
# streaming (only supports UTF-8 string data for now)
|
||||
if _parser_is_streaming(parser):
|
||||
result = parser.parse(sys.stdin,
|
||||
raw=raw,
|
||||
quiet=quiet,
|
||||
ignore_exceptions=ignore_exceptions)
|
||||
if _parser_is_streaming(cli.parser_module):
|
||||
cli.data_in = sys.stdin
|
||||
result = cli.parser_module.parse(cli.data_in,
|
||||
raw=cli.raw,
|
||||
quiet=cli.quiet,
|
||||
ignore_exceptions=cli.ignore_exceptions)
|
||||
|
||||
for line in result:
|
||||
if meta_out:
|
||||
run_dt_utc = datetime.now(timezone.utc)
|
||||
add_metadata_to(line, run_dt_utc, run_command, magic_exit_code, parser_name)
|
||||
cli.data_out = line
|
||||
if cli.meta_out:
|
||||
cli.run_timestamp = datetime.now(timezone.utc)
|
||||
cli.add_metadata_to_output()
|
||||
|
||||
safe_print_out(line,
|
||||
pretty=pretty,
|
||||
env_colors=jc_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_output(force_color),
|
||||
flush=unbuffer,
|
||||
yaml=yaml_out)
|
||||
cli.safe_print_out()
|
||||
|
||||
sys.exit(combined_exit_code(magic_exit_code, 0))
|
||||
sys.exit(cli.combined_exit_code())
|
||||
|
||||
# regular (supports binary and UTF-8 string data)
|
||||
else:
|
||||
data = magic_stdout or sys.stdin.buffer.read()
|
||||
print(cli.stdout)
|
||||
cli.data_in = cli.stdout or sys.stdin.buffer.read()
|
||||
|
||||
# convert to UTF-8, if possible. Otherwise, leave as bytes
|
||||
try:
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode('utf-8')
|
||||
if isinstance(cli.data_in, bytes):
|
||||
cli.data_in = cli.data_in.decode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
pass
|
||||
|
||||
result = parser.parse(data,
|
||||
raw=raw,
|
||||
quiet=quiet)
|
||||
cli.data_out = cli.parser_module.parse(cli.data_in,
|
||||
raw=cli.raw,
|
||||
quiet=cli.quiet)
|
||||
|
||||
if meta_out:
|
||||
run_dt_utc = datetime.now(timezone.utc)
|
||||
add_metadata_to(result, run_dt_utc, run_command, magic_exit_code, parser_name)
|
||||
if cli.meta_out:
|
||||
cli.run_timestamp = datetime.now(timezone.utc)
|
||||
cli.add_metadata_to_output()
|
||||
|
||||
safe_print_out(result,
|
||||
pretty=pretty,
|
||||
env_colors=jc_colors,
|
||||
mono=mono,
|
||||
piped_out=piped_output(force_color),
|
||||
flush=unbuffer,
|
||||
yaml=yaml_out)
|
||||
cli.safe_print_out()
|
||||
|
||||
sys.exit(combined_exit_code(magic_exit_code, 0))
|
||||
sys.exit(cli.combined_exit_code())
|
||||
|
||||
except (ParseError, LibraryNotInstalled) as e:
|
||||
if debug:
|
||||
if cli.debug:
|
||||
raise
|
||||
|
||||
utils.error_message([
|
||||
f'Parser issue with {parser_name}:', f'{e.__class__.__name__}: {e}',
|
||||
f'Parser issue with {cli.parser_name}:', f'{e.__class__.__name__}: {e}',
|
||||
'If this is the correct parser, try setting the locale to C (LC_ALL=C).',
|
||||
f'For details use the -d or -dd option. Use "jc -h --{parser_name}" for help.'
|
||||
f'For details use the -d or -dd option. Use "jc -h --{cli.parser_name}" for help.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
cli.jc_error = cli.JC_ERROR_EXIT
|
||||
sys.exit(cli.combined_exit_code())
|
||||
|
||||
except Exception:
|
||||
if debug:
|
||||
if cli.debug:
|
||||
raise
|
||||
|
||||
streaming_msg = ''
|
||||
if _parser_is_streaming(parser):
|
||||
if _parser_is_streaming(cli.parser_module):
|
||||
streaming_msg = 'Use the -qq option to ignore streaming parser errors.'
|
||||
|
||||
utils.error_message([
|
||||
f'{parser_name} parser could not parse the input data.',
|
||||
f'{cli.parser_name} parser could not parse the input data.',
|
||||
f'{streaming_msg}',
|
||||
'If this is the correct parser, try setting the locale to C (LC_ALL=C).',
|
||||
f'For details use the -d or -dd option. Use "jc -h --{parser_name}" for help.'
|
||||
f'For details use the -d or -dd option. Use "jc -h --{cli.parser_name}" for help.'
|
||||
])
|
||||
sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT))
|
||||
cli.jc_error = cli.JC_ERROR_EXIT
|
||||
sys.exit(cli.combined_exit_code())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
Reference in New Issue
Block a user