From cb9979ac9429dfda18f0606e11e64c751a3fad24 Mon Sep 17 00:00:00 2001 From: Kelly Brazil Date: Sun, 2 Oct 2022 16:58:20 -0700 Subject: [PATCH] refactor cli --- jc/cli.old.py | 767 +++++++++++++++++++++++++++++++++++++++ jc/cli.py | 986 +++++++++++++++++++++++++------------------------- 2 files changed, 1266 insertions(+), 487 deletions(-) create mode 100644 jc/cli.old.py diff --git a/jc/cli.old.py b/jc/cli.old.py new file mode 100644 index 00000000..fb2ecbb2 --- /dev/null +++ b/jc/cli.old.py @@ -0,0 +1,767 @@ +"""jc - JSON Convert +JC cli module +""" + +import io +import sys +import os +from datetime import datetime, timezone +import textwrap +import signal +import shlex +import subprocess +from .lib import (__version__, parser_info, all_parser_info, parsers, + _get_parser, _parser_is_streaming, parser_mod_list, + standard_parser_mod_list, plugin_parser_mod_list, + streaming_parser_mod_list) +from . import utils +from .cli_data import long_options_map, new_pygments_colors, old_pygments_colors +from .shell_completions import bash_completion, zsh_completion +from . import tracebackplus +from .exceptions import LibraryNotInstalled, ParseError + +# make pygments import optional +try: + import pygments + from pygments import highlight + from pygments.style import Style + from pygments.token import (Name, Number, String, Keyword) + from pygments.lexers.data import JsonLexer, YamlLexer + from pygments.formatters import Terminal256Formatter + PYGMENTS_INSTALLED = True +except Exception: + PYGMENTS_INSTALLED = False + +JC_ERROR_EXIT = 100 + + +class info(): + version = __version__ + description = 'JSON Convert' + author = 'Kelly Brazil' + author_email = 'kellyjonbrazil@gmail.com' + website = 'https://github.com/kellyjonbrazil/jc' + copyright = '© 2019-2022 Kelly Brazil' + license = 'MIT License' + + +# We only support 2.3.0+, pygments changed color names in 2.4.0. +# startswith is sufficient and avoids potential exceptions from split and int. +if PYGMENTS_INSTALLED: + if pygments.__version__.startswith('2.3.'): + PYGMENT_COLOR = old_pygments_colors + else: + PYGMENT_COLOR = new_pygments_colors + + +def set_env_colors(env_colors=None): + """ + Return a dictionary to be used in Pygments custom style class. + + Grab custom colors from JC_COLORS environment variable. JC_COLORS env + variable takes 4 comma separated string values and should be in the + format of: + + JC_COLORS=,,, + + Where colors are: black, red, green, yellow, blue, magenta, cyan, gray, + brightblack, brightred, brightgreen, brightyellow, + brightblue, brightmagenta, brightcyan, white, default + + Default colors: + + JC_COLORS=blue,brightblack,magenta,green + or + JC_COLORS=default,default,default,default + """ + input_error = False + + if env_colors: + color_list = env_colors.split(',') + else: + color_list = ['default', 'default', 'default', 'default'] + + if len(color_list) != 4: + input_error = True + + for color in color_list: + if color != 'default' and color not in PYGMENT_COLOR: + input_error = True + + # if there is an issue with the env variable, just set all colors to default and move on + if input_error: + utils.warning_message(['Could not parse JC_COLORS environment variable']) + color_list = ['default', 'default', 'default', 'default'] + + # Try the color set in the JC_COLORS env variable first. If it is set to default, then fall back to default colors + return { + Name.Tag: f'bold {PYGMENT_COLOR[color_list[0]]}' if color_list[0] != 'default' else f"bold {PYGMENT_COLOR['blue']}", # key names + Keyword: PYGMENT_COLOR[color_list[1]] if color_list[1] != 'default' else PYGMENT_COLOR['brightblack'], # true, false, null + Number: PYGMENT_COLOR[color_list[2]] if color_list[2] != 'default' else PYGMENT_COLOR['magenta'], # numbers + String: PYGMENT_COLOR[color_list[3]] if color_list[3] != 'default' else PYGMENT_COLOR['green'] # strings + } + + +def piped_output(force_color): + """ + Return False if `STDOUT` is a TTY. True if output is being piped to + another program and foce_color is True. This allows forcing of ANSI + color codes even when using pipes. + """ + return not sys.stdout.isatty() and not force_color + + +def ctrlc(signum, frame): + """Exit with error on SIGINT""" + sys.exit(JC_ERROR_EXIT) + + +def parser_shortname(parser_arg): + """Return short name of the parser with dashes and no -- prefix""" + return parser_arg[2:] + + +def parsers_text(indent=0, pad=0, show_hidden=False): + """Return the argument and description information from each parser""" + ptext = '' + padding_char = ' ' + for p in all_parser_info(show_hidden=show_hidden): + parser_arg = p.get('argument', 'UNKNOWN') + padding = pad - len(parser_arg) + parser_desc = p.get('description', 'No description available.') + indent_text = padding_char * indent + padding_text = padding_char * padding + ptext += indent_text + parser_arg + padding_text + parser_desc + '\n' + + return ptext + + +def options_text(indent=0, pad=0): + """Return the argument and description information from each option""" + otext = '' + padding_char = ' ' + for option in long_options_map: + o_short = '-' + long_options_map[option][0] + o_desc = long_options_map[option][1] + o_combined = o_short + ', ' + option + padding = pad - len(o_combined) + indent_text = padding_char * indent + padding_text = padding_char * padding + otext += indent_text + o_combined + padding_text + o_desc + '\n' + + return otext + + +def about_jc(): + """Return jc info and the contents of each parser.info as a dictionary""" + return { + 'name': 'jc', + 'version': info.version, + 'description': info.description, + 'author': info.author, + 'author_email': info.author_email, + 'website': info.website, + 'copyright': info.copyright, + 'license': info.license, + 'python_version': '.'.join((str(sys.version_info.major), str(sys.version_info.minor), str(sys.version_info.micro))), + 'python_path': sys.executable, + 'parser_count': len(parser_mod_list()), + 'standard_parser_count': len(standard_parser_mod_list()), + 'streaming_parser_count': len(streaming_parser_mod_list()), + 'plugin_parser_count': len(plugin_parser_mod_list()), + 'parsers': all_parser_info(show_hidden=True) + } + + +def helptext(show_hidden=False): + """Return the help text with the list of parsers""" + parsers_string = parsers_text(indent=4, pad=20, show_hidden=show_hidden) + options_string = options_text(indent=4, pad=20) + + helptext_string = f'''\ +jc converts the output of many commands, file-types, and strings to JSON or YAML + +Usage: + + Standard syntax: + + COMMAND | jc [OPTIONS] PARSER + + cat FILE | jc [OPTIONS] PARSER + + echo STRING | jc [OPTIONS] PARSER + + Magic syntax: + + jc [OPTIONS] COMMAND + + jc [OPTIONS] /proc/ + +Parsers: +{parsers_string} +Options: +{options_string} +Examples: + Standard Syntax: + $ dig www.google.com | jc --pretty --dig + $ cat /proc/meminfo | jc --pretty --proc + + Magic Syntax: + $ jc --pretty dig www.google.com + $ jc --pretty /proc/meminfo + + Parser Documentation: + $ jc --help --dig + + Show Hidden Parsers: + $ jc -hh +''' + + return helptext_string + + +def help_doc(options, show_hidden=False): + """ + Returns the parser documentation if a parser is found in the arguments, + otherwise the general help text is returned. + """ + for arg in options: + parser_name = parser_shortname(arg) + + if parser_name in parsers: + p_info = parser_info(arg, documentation=True) + compatible = ', '.join(p_info.get('compatible', ['unknown'])) + documentation = p_info.get('documentation', 'No documentation available.') + version = p_info.get('version', 'unknown') + author = p_info.get('author', 'unknown') + author_email = p_info.get('author_email', 'unknown') + doc_text = \ + f'{documentation}\n'\ + f'Compatibility: {compatible}\n\n'\ + f'Version {version} by {author} ({author_email})\n' + + utils._safe_pager(doc_text) + return + + utils._safe_print(helptext(show_hidden=show_hidden)) + return + + +def versiontext(): + """Return the version text""" + py_ver = '.'.join((str(sys.version_info.major), str(sys.version_info.minor), str(sys.version_info.micro))) + versiontext_string = f'''\ + jc version: {info.version} + python interpreter version: {py_ver} + python path: {sys.executable} + + {info.website} + {info.copyright} + ''' + return textwrap.dedent(versiontext_string) + + +def yaml_out(data, pretty=False, env_colors=None, mono=False, piped_out=False, ascii_only=False): + """ + Return a YAML formatted string. String may include color codes. If the + YAML library is not installed, output will fall back to JSON with a + warning message to STDERR""" + # make ruamel.yaml import optional + try: + from ruamel.yaml import YAML, representer + YAML_INSTALLED = True + except Exception: + YAML_INSTALLED = False + + if YAML_INSTALLED: + y_string_buf = io.BytesIO() + + # monkey patch to disable plugins since we don't use them and in + # ruamel.yaml versions prior to 0.17.0 the use of __file__ in the + # plugin code is incompatible with the pyoxidizer packager + YAML.official_plug_ins = lambda a: [] + + # monkey patch to disable aliases + representer.RoundTripRepresenter.ignore_aliases = lambda x, y: True + + yaml = YAML() + yaml.default_flow_style = False + yaml.explicit_start = True + yaml.allow_unicode = not ascii_only + yaml.encoding = 'utf-8' + yaml.dump(data, y_string_buf) + y_string = y_string_buf.getvalue().decode('utf-8')[:-1] + + if not mono and not piped_out: + # set colors + class JcStyle(Style): + styles = set_env_colors(env_colors) + + return str(highlight(y_string, YamlLexer(), Terminal256Formatter(style=JcStyle))[0:-1]) + + return y_string + + utils.warning_message(['YAML Library not installed. Reverting to JSON output.']) + return json_out(data, pretty=pretty, env_colors=env_colors, mono=mono, piped_out=piped_out, ascii_only=ascii_only) + + +def json_out(data, pretty=False, env_colors=None, mono=False, piped_out=False, ascii_only=False): + """ + Return a JSON formatted string. String may include color codes or be + pretty printed. + """ + import json + + separators = (',', ':') + indent = None + + if pretty: + separators = None + indent = 2 + + j_string = json.dumps(data, indent=indent, separators=separators, ensure_ascii=ascii_only) + + if not mono and not piped_out: + # set colors + class JcStyle(Style): + styles = set_env_colors(env_colors) + + return str(highlight(j_string, JsonLexer(), Terminal256Formatter(style=JcStyle))[0:-1]) + + return j_string + + +def safe_print_out(list_or_dict, pretty=None, env_colors=None, mono=None, + piped_out=None, flush=None, yaml=None): + """Safely prints JSON or YAML output in both UTF-8 and ASCII systems""" + if yaml: + try: + print(yaml_out(list_or_dict, + pretty=pretty, + env_colors=env_colors, + mono=mono, + piped_out=piped_out), + flush=flush) + except UnicodeEncodeError: + print(yaml_out(list_or_dict, + pretty=pretty, + env_colors=env_colors, + mono=mono, + piped_out=piped_out, + ascii_only=True), + flush=flush) + + else: + try: + print(json_out(list_or_dict, + pretty=pretty, + env_colors=env_colors, + mono=mono, + piped_out=piped_out), + flush=flush) + except UnicodeEncodeError: + print(json_out(list_or_dict, + pretty=pretty, + env_colors=env_colors, + mono=mono, + piped_out=piped_out, + ascii_only=True), + flush=flush) + + +def magic_parser(args): + """ + Parse command arguments for magic syntax: jc -p ls -al + + Return a tuple: + valid_command (bool) is this a valid cmd? (exists in magic dict) + run_command (list) list of the user's cmd to run. None if no cmd. + jc_parser (str) parser to use for this user's cmd. + jc_options (list) list of jc options + """ + # bail immediately if there are no args or a parser is defined + if len(args) <= 1 or (args[1].startswith('--') and args[1] not in long_options_map): + return False, None, None, [] + + args_given = args[1:] + options = [] + + # find the options + for arg in list(args_given): + # long option found - populate option list + if arg in long_options_map: + options.extend(long_options_map[arg][0]) + args_given.pop(0) + continue + + # parser found - use standard syntax + if arg.startswith('--'): + return False, None, None, [] + + # option found - populate option list + if arg.startswith('-'): + options.extend(args_given.pop(0)[1:]) + + # command found if iterator didn't already stop - stop iterating + else: + break + + # if -h, -a, or -v found in options, then bail out + if 'h' in options or 'a' in options or 'v' in options: + return False, None, None, [] + + # all options popped and no command found - for case like 'jc -x' + if len(args_given) == 0: + return False, None, None, [] + + # create a dictionary of magic_commands to their respective parsers. + magic_dict = {} + for entry in all_parser_info(): + magic_dict.update({mc: entry['argument'] for mc in entry.get('magic_commands', [])}) + + # find the command and parser + one_word_command = args_given[0] + two_word_command = ' '.join(args_given[0:2]) + + # try to get a parser for two_word_command, otherwise get one for one_word_command + found_parser = magic_dict.get(two_word_command, magic_dict.get(one_word_command)) + + return ( + bool(found_parser), # was a suitable parser found? + args_given, # run_command + found_parser, # the parser selected + options # jc options to preserve + ) + + +def open_text_file(path_string): + with open(path_string, 'r') as f: + return f.read() + + +def run_user_command(command): + """ + Use subprocess to run the user's command. Returns the STDOUT, STDERR, + and the Exit Code as a tuple. + """ + proc = subprocess.Popen(command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=False, # Allows inheriting file descriptors; + universal_newlines=True, # useful for process substitution + encoding='UTF-8') + stdout, stderr = proc.communicate() + + return ( + stdout or '\n', + stderr, + proc.returncode + ) + + +def combined_exit_code(program_exit=0, jc_exit=0): + exit_code = program_exit + jc_exit + exit_code = min(exit_code, 255) + return exit_code + + +def add_metadata_to(list_or_dict, + runtime=None, + run_command=None, + magic_exit_code=None, + parser_name=None): + """ + This function mutates a list or dict in place. If the _jc_meta field + does not already exist, it will be created with the metadata fields. If + the _jc_meta field already exists, the metadata fields will be added to + the existing object. + + In the case of an empty list (no data), a dictionary with a _jc_meta + object will be added to the list. This way you always get metadata, + even if there are no results. + """ + run_timestamp = runtime.timestamp() + + meta_obj = { + 'parser': parser_name, + 'timestamp': run_timestamp + } + + if run_command: + meta_obj['magic_command'] = run_command + meta_obj['magic_command_exit'] = magic_exit_code + + if isinstance(list_or_dict, dict): + if '_jc_meta' not in list_or_dict: + list_or_dict['_jc_meta'] = {} + + list_or_dict['_jc_meta'].update(meta_obj) + + elif isinstance(list_or_dict, list): + if not list_or_dict: + list_or_dict.append({}) + + for item in list_or_dict: + if '_jc_meta' not in item: + item['_jc_meta'] = {} + + item['_jc_meta'].update(meta_obj) + + else: + utils.error_message(['Parser returned an unsupported object type.']) + sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + + +def main(): + # break on ctrl-c keyboard interrupt + signal.signal(signal.SIGINT, ctrlc) + + # break on pipe error. need try/except for windows compatibility + try: + signal.signal(signal.SIGPIPE, signal.SIG_DFL) + except AttributeError: + pass + + # enable colors for Windows cmd.exe terminal + if sys.platform.startswith('win32'): + os.system('') + + # parse magic syntax first: e.g. jc -p ls -al + magic_options = [] + valid_command, run_command, magic_found_parser, magic_options = magic_parser(sys.argv) + + # set colors + jc_colors = os.getenv('JC_COLORS') + + # set options + options = [] + options.extend(magic_options) + + # find options if magic_parser did not find a command + if not valid_command: + for opt in sys.argv: + if opt in long_options_map: + options.extend(long_options_map[opt][0]) + + if opt.startswith('-') and not opt.startswith('--'): + options.extend(opt[1:]) + + about = 'a' in options + debug = 'd' in options + verbose_debug = options.count('d') > 1 + force_color = 'C' in options + mono = ('m' in options or bool(os.getenv('NO_COLOR'))) and not force_color + help_me = 'h' in options + verbose_help = options.count('h') > 1 + pretty = 'p' in options + quiet = 'q' in options + ignore_exceptions = options.count('q') > 1 + raw = 'r' in options + meta_out = 'M' in options + unbuffer = 'u' in options + version_info = 'v' in options + yaml_out = 'y' in options + bash_comp = 'B' in options + zsh_comp = 'Z' in options + + if verbose_debug: + tracebackplus.enable(context=11) + + if not PYGMENTS_INSTALLED: + mono = True + + if about: + safe_print_out(about_jc(), + pretty=pretty, + env_colors=jc_colors, + mono=mono, + piped_out=piped_output(force_color), + yaml=yaml_out) + sys.exit(0) + + if help_me: + help_doc(sys.argv, show_hidden=verbose_help) + sys.exit(0) + + if version_info: + utils._safe_print(versiontext()) + sys.exit(0) + + if bash_comp: + utils._safe_print(bash_completion()) + sys.exit(0) + + if zsh_comp: + utils._safe_print(zsh_completion()) + sys.exit(0) + + # if magic syntax used, try to run the command and error if it's not found, etc. + magic_stdout, magic_stderr, magic_exit_code = None, None, 0 + run_command_str = '' + if run_command: + try: + run_command_str = shlex.join(run_command) # python 3.8+ + except AttributeError: + run_command_str = ' '.join(run_command) # older python versions + + if run_command_str.startswith('/proc'): + try: + magic_found_parser = 'proc' + magic_stdout = open_text_file(run_command_str) + + except OSError as e: + if debug: + raise + + error_msg = os.strerror(e.errno) + utils.error_message([ + f'"{run_command_str}" file could not be opened: {error_msg}.' + ]) + sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + + except Exception: + if debug: + raise + + utils.error_message([ + f'"{run_command_str}" file could not be opened. For details use the -d or -dd option.' + ]) + sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + + elif valid_command: + try: + magic_stdout, magic_stderr, magic_exit_code = run_user_command(run_command) + if magic_stderr: + utils._safe_print(magic_stderr[:-1], file=sys.stderr) + + except OSError as e: + if debug: + raise + + error_msg = os.strerror(e.errno) + utils.error_message([ + f'"{run_command_str}" command could not be run: {error_msg}.' + ]) + sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + + except Exception: + if debug: + raise + + utils.error_message([ + f'"{run_command_str}" command could not be run. For details use the -d or -dd option.' + ]) + sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + + elif run_command is not None: + utils.error_message([f'"{run_command_str}" cannot be used with Magic syntax. Use "jc -h" for help.']) + sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + + # find the correct parser + if magic_found_parser: + parser = _get_parser(magic_found_parser) + parser_name = parser_shortname(magic_found_parser) + + else: + found = False + for arg in sys.argv: + parser_name = parser_shortname(arg) + + if parser_name in parsers: + parser = _get_parser(arg) + found = True + break + + if not found: + utils.error_message(['Missing or incorrect arguments. Use "jc -h" for help.']) + sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + + if sys.stdin.isatty() and magic_stdout is None: + utils.error_message(['Missing piped data. Use "jc -h" for help.']) + sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + + # parse and print to stdout + try: + # differentiate between regular and streaming parsers + + # streaming (only supports UTF-8 string data for now) + if _parser_is_streaming(parser): + result = parser.parse(sys.stdin, + raw=raw, + quiet=quiet, + ignore_exceptions=ignore_exceptions) + + for line in result: + if meta_out: + run_dt_utc = datetime.now(timezone.utc) + add_metadata_to(line, run_dt_utc, run_command, magic_exit_code, parser_name) + + safe_print_out(line, + pretty=pretty, + env_colors=jc_colors, + mono=mono, + piped_out=piped_output(force_color), + flush=unbuffer, + yaml=yaml_out) + + sys.exit(combined_exit_code(magic_exit_code, 0)) + + # regular (supports binary and UTF-8 string data) + else: + data = magic_stdout or sys.stdin.buffer.read() + + # convert to UTF-8, if possible. Otherwise, leave as bytes + try: + if isinstance(data, bytes): + data = data.decode('utf-8') + except UnicodeDecodeError: + pass + + result = parser.parse(data, + raw=raw, + quiet=quiet) + + if meta_out: + run_dt_utc = datetime.now(timezone.utc) + add_metadata_to(result, run_dt_utc, run_command, magic_exit_code, parser_name) + + safe_print_out(result, + pretty=pretty, + env_colors=jc_colors, + mono=mono, + piped_out=piped_output(force_color), + flush=unbuffer, + yaml=yaml_out) + + sys.exit(combined_exit_code(magic_exit_code, 0)) + + except (ParseError, LibraryNotInstalled) as e: + if debug: + raise + + utils.error_message([ + f'Parser issue with {parser_name}:', f'{e.__class__.__name__}: {e}', + 'If this is the correct parser, try setting the locale to C (LC_ALL=C).', + f'For details use the -d or -dd option. Use "jc -h --{parser_name}" for help.' + ]) + sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + + except Exception: + if debug: + raise + + streaming_msg = '' + if _parser_is_streaming(parser): + streaming_msg = 'Use the -qq option to ignore streaming parser errors.' + + utils.error_message([ + f'{parser_name} parser could not parse the input data.', + f'{streaming_msg}', + 'If this is the correct parser, try setting the locale to C (LC_ALL=C).', + f'For details use the -d or -dd option. Use "jc -h --{parser_name}" for help.' + ]) + sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + + +if __name__ == '__main__': + main() diff --git a/jc/cli.py b/jc/cli.py index fb2ecbb2..05759bc1 100644 --- a/jc/cli.py +++ b/jc/cli.py @@ -10,6 +10,7 @@ import textwrap import signal import shlex import subprocess +from typing import List, Dict from .lib import (__version__, parser_info, all_parser_info, parsers, _get_parser, _parser_is_streaming, parser_mod_list, standard_parser_mod_list, plugin_parser_mod_list, @@ -32,8 +33,6 @@ try: except Exception: PYGMENTS_INSTALLED = False -JC_ERROR_EXIT = 100 - class info(): version = __version__ @@ -54,131 +53,183 @@ if PYGMENTS_INSTALLED: PYGMENT_COLOR = new_pygments_colors -def set_env_colors(env_colors=None): - """ - Return a dictionary to be used in Pygments custom style class. +class JcCli(): - Grab custom colors from JC_COLORS environment variable. JC_COLORS env - variable takes 4 comma separated string values and should be in the - format of: + def __init__(self) -> None: + self.data_in = None + self.data_out = None + self.current_out_line = None + self.options: List[str] = [] + self.args: List[str] = [] + self.parser_arg = None + self.parser_module = None + self.parser_name = None + self.indent = 0 + self.pad = 0 + self.env_colors: Dict = {} + self.documentation = False + self.show_hidden = False + self.piped_out = False + self.ascii_only = False + self.flush = False + self.json_separators = (',', ':') + self.json_indent = None + self.path_string = None + self.run_command_str = '' + self.command = None + self.program_exit = 0 + self.jc_exit = 0 + self.JC_ERROR_EXIT = 100 + self.exit_code = 0 + self.runtime: datetime = datetime.now() + self.run_timestamp = None - JC_COLORS=,,, + # cli options + self.about = False + self.debug = False + self.verbose_debug = False + self.force_color = False + self.mono = False + self.help_me = False + self.verbose_help = False + self.pretty = False + self.quiet = False + self.ignore_exceptions = False + self.raw = False + self.meta_out = False + self.unbuffer = False + self.version_info = False + self.yaml_output = False + self.bash_comp = False + self.zsh_comp = False - Where colors are: black, red, green, yellow, blue, magenta, cyan, gray, - brightblack, brightred, brightgreen, brightyellow, - brightblue, brightmagenta, brightcyan, white, default + # Magic options + self.valid_command = False + self.run_command = None + self.magic_found_parser = None + self.magic_options: List[str] = [] + self.magic_stdout = None, + self.magic_stderr = None, + self.magic_returncode = None - Default colors: - JC_COLORS=blue,brightblack,magenta,green - or - JC_COLORS=default,default,default,default - """ - input_error = False + def set_env_colors(self): + """ + Return a dictionary to be used in Pygments custom style class. - if env_colors: - color_list = env_colors.split(',') - else: - color_list = ['default', 'default', 'default', 'default'] + Grab custom colors from JC_COLORS environment variable. JC_COLORS env + variable takes 4 comma separated string values and should be in the + format of: - if len(color_list) != 4: - input_error = True + JC_COLORS=,,, - for color in color_list: - if color != 'default' and color not in PYGMENT_COLOR: + Where colors are: black, red, green, yellow, blue, magenta, cyan, gray, + brightblack, brightred, brightgreen, brightyellow, + brightblue, brightmagenta, brightcyan, white, default + + Default colors: + + JC_COLORS=blue,brightblack,magenta,green + or + JC_COLORS=default,default,default,default + """ + input_error = False + + if self.env_colors: + color_list = self.env_colors.split(',') + else: + color_list = ['default', 'default', 'default', 'default'] + + if len(color_list) != 4: input_error = True - # if there is an issue with the env variable, just set all colors to default and move on - if input_error: - utils.warning_message(['Could not parse JC_COLORS environment variable']) - color_list = ['default', 'default', 'default', 'default'] + for color in color_list: + if color != 'default' and color not in PYGMENT_COLOR: + input_error = True - # Try the color set in the JC_COLORS env variable first. If it is set to default, then fall back to default colors - return { - Name.Tag: f'bold {PYGMENT_COLOR[color_list[0]]}' if color_list[0] != 'default' else f"bold {PYGMENT_COLOR['blue']}", # key names - Keyword: PYGMENT_COLOR[color_list[1]] if color_list[1] != 'default' else PYGMENT_COLOR['brightblack'], # true, false, null - Number: PYGMENT_COLOR[color_list[2]] if color_list[2] != 'default' else PYGMENT_COLOR['magenta'], # numbers - String: PYGMENT_COLOR[color_list[3]] if color_list[3] != 'default' else PYGMENT_COLOR['green'] # strings - } + # if there is an issue with the env variable, just set all colors to default and move on + if input_error: + utils.warning_message(['Could not parse JC_COLORS environment variable']) + color_list = ['default', 'default', 'default', 'default'] + # Try the color set in the JC_COLORS env variable first. If it is set to default, then fall back to default colors + self.set_env_colors = { + Name.Tag: f'bold {PYGMENT_COLOR[color_list[0]]}' if color_list[0] != 'default' else f"bold {PYGMENT_COLOR['blue']}", # key names + Keyword: PYGMENT_COLOR[color_list[1]] if color_list[1] != 'default' else PYGMENT_COLOR['brightblack'], # true, false, null + Number: PYGMENT_COLOR[color_list[2]] if color_list[2] != 'default' else PYGMENT_COLOR['magenta'], # numbers + String: PYGMENT_COLOR[color_list[3]] if color_list[3] != 'default' else PYGMENT_COLOR['green'] # strings + } -def piped_output(force_color): - """ - Return False if `STDOUT` is a TTY. True if output is being piped to - another program and foce_color is True. This allows forcing of ANSI - color codes even when using pipes. - """ - return not sys.stdout.isatty() and not force_color + def piped_output(self): + """ + Return False if `STDOUT` is a TTY. True if output is being piped to + another program and foce_color is True. This allows forcing of ANSI + color codes even when using pipes. + """ + return not sys.stdout.isatty() and not self.force_color + def parser_shortname(self): + """Return short name of the parser with dashes and no -- prefix""" + return self.parser_arg[2:] -def ctrlc(signum, frame): - """Exit with error on SIGINT""" - sys.exit(JC_ERROR_EXIT) + def parsers_text(self): + """Return the argument and description information from each parser""" + ptext = '' + padding_char = ' ' + for p in all_parser_info(show_hidden=self.show_hidden): + parser_arg = p.get('argument', 'UNKNOWN') + padding = self.pad - len(parser_arg) + parser_desc = p.get('description', 'No description available.') + indent_text = padding_char * self.indent + padding_text = padding_char * padding + ptext += indent_text + parser_arg + padding_text + parser_desc + '\n' + return ptext -def parser_shortname(parser_arg): - """Return short name of the parser with dashes and no -- prefix""" - return parser_arg[2:] + def options_text(self): + """Return the argument and description information from each option""" + otext = '' + padding_char = ' ' + for option in long_options_map: + o_short = '-' + long_options_map[option][0] + o_desc = long_options_map[option][1] + o_combined = o_short + ', ' + option + padding = self.pad - len(o_combined) + indent_text = padding_char * self.indent + padding_text = padding_char * padding + otext += indent_text + o_combined + padding_text + o_desc + '\n' + return otext -def parsers_text(indent=0, pad=0, show_hidden=False): - """Return the argument and description information from each parser""" - ptext = '' - padding_char = ' ' - for p in all_parser_info(show_hidden=show_hidden): - parser_arg = p.get('argument', 'UNKNOWN') - padding = pad - len(parser_arg) - parser_desc = p.get('description', 'No description available.') - indent_text = padding_char * indent - padding_text = padding_char * padding - ptext += indent_text + parser_arg + padding_text + parser_desc + '\n' + @staticmethod + def about_jc(): + """Return jc info and the contents of each parser.info as a dictionary""" + return { + 'name': 'jc', + 'version': info.version, + 'description': info.description, + 'author': info.author, + 'author_email': info.author_email, + 'website': info.website, + 'copyright': info.copyright, + 'license': info.license, + 'python_version': '.'.join((str(sys.version_info.major), str(sys.version_info.minor), str(sys.version_info.micro))), + 'python_path': sys.executable, + 'parser_count': len(parser_mod_list()), + 'standard_parser_count': len(standard_parser_mod_list()), + 'streaming_parser_count': len(streaming_parser_mod_list()), + 'plugin_parser_count': len(plugin_parser_mod_list()), + 'parsers': all_parser_info(show_hidden=True) + } - return ptext + def helptext(self): + """Return the help text with the list of parsers""" + self.indent = 4 + self.pad = 20 + parsers_string = self.parsers_text() + options_string = self.options_text() - -def options_text(indent=0, pad=0): - """Return the argument and description information from each option""" - otext = '' - padding_char = ' ' - for option in long_options_map: - o_short = '-' + long_options_map[option][0] - o_desc = long_options_map[option][1] - o_combined = o_short + ', ' + option - padding = pad - len(o_combined) - indent_text = padding_char * indent - padding_text = padding_char * padding - otext += indent_text + o_combined + padding_text + o_desc + '\n' - - return otext - - -def about_jc(): - """Return jc info and the contents of each parser.info as a dictionary""" - return { - 'name': 'jc', - 'version': info.version, - 'description': info.description, - 'author': info.author, - 'author_email': info.author_email, - 'website': info.website, - 'copyright': info.copyright, - 'license': info.license, - 'python_version': '.'.join((str(sys.version_info.major), str(sys.version_info.minor), str(sys.version_info.micro))), - 'python_path': sys.executable, - 'parser_count': len(parser_mod_list()), - 'standard_parser_count': len(standard_parser_mod_list()), - 'streaming_parser_count': len(streaming_parser_mod_list()), - 'plugin_parser_count': len(plugin_parser_mod_list()), - 'parsers': all_parser_info(show_hidden=True) - } - - -def helptext(show_hidden=False): - """Return the help text with the list of parsers""" - parsers_string = parsers_text(indent=4, pad=20, show_hidden=show_hidden) - options_string = options_text(indent=4, pad=20) - - helptext_string = f'''\ + helptext_string = f'''\ jc converts the output of many commands, file-types, and strings to JSON or YAML Usage: @@ -217,304 +268,272 @@ Examples: $ jc -hh ''' - return helptext_string + return helptext_string + + def help_doc(self): + """ + Returns the parser documentation if a parser is found in the arguments, + otherwise the general help text is returned. + """ + for arg in self.options: + self.parser_arg = arg + parser_name = self.parser_shortname() + + if parser_name in parsers: + self.documentation = True + p_info = parser_info() + compatible = ', '.join(p_info.get('compatible', ['unknown'])) + docs = p_info.get('documentation', 'No documentation available.') + version = p_info.get('version', 'unknown') + author = p_info.get('author', 'unknown') + author_email = p_info.get('author_email', 'unknown') + doc_text = \ + f'{docs}\n'\ + f'Compatibility: {compatible}\n\n'\ + f'Version {version} by {author} ({author_email})\n' + + utils._safe_pager(doc_text) + return + + utils._safe_print(self.helptext()) + return + + @staticmethod + def versiontext(): + """Return the version text""" + py_ver = '.'.join((str(sys.version_info.major), str(sys.version_info.minor), str(sys.version_info.micro))) + versiontext_string = f'''\ + jc version: {info.version} + python interpreter version: {py_ver} + python path: {sys.executable} + + {info.website} + {info.copyright} + ''' + return textwrap.dedent(versiontext_string) -def help_doc(options, show_hidden=False): - """ - Returns the parser documentation if a parser is found in the arguments, - otherwise the general help text is returned. - """ - for arg in options: - parser_name = parser_shortname(arg) + def yaml_out(self): + """ + Return a YAML formatted string. String may include color codes. If the + YAML library is not installed, output will fall back to JSON with a + warning message to STDERR""" + # make ruamel.yaml import optional + try: + from ruamel.yaml import YAML, representer + YAML_INSTALLED = True + except Exception: + YAML_INSTALLED = False - if parser_name in parsers: - p_info = parser_info(arg, documentation=True) - compatible = ', '.join(p_info.get('compatible', ['unknown'])) - documentation = p_info.get('documentation', 'No documentation available.') - version = p_info.get('version', 'unknown') - author = p_info.get('author', 'unknown') - author_email = p_info.get('author_email', 'unknown') - doc_text = \ - f'{documentation}\n'\ - f'Compatibility: {compatible}\n\n'\ - f'Version {version} by {author} ({author_email})\n' + if YAML_INSTALLED: + y_string_buf = io.BytesIO() - utils._safe_pager(doc_text) - return + # monkey patch to disable plugins since we don't use them and in + # ruamel.yaml versions prior to 0.17.0 the use of __file__ in the + # plugin code is incompatible with the pyoxidizer packager + YAML.official_plug_ins = lambda a: [] - utils._safe_print(helptext(show_hidden=show_hidden)) - return + # monkey patch to disable aliases + representer.RoundTripRepresenter.ignore_aliases = lambda x, y: True + yaml = YAML() + yaml.default_flow_style = False + yaml.explicit_start = True + yaml.allow_unicode = not self.ascii_only + yaml.encoding = 'utf-8' + yaml.dump(self.data_out, y_string_buf) + y_string = y_string_buf.getvalue().decode('utf-8')[:-1] -def versiontext(): - """Return the version text""" - py_ver = '.'.join((str(sys.version_info.major), str(sys.version_info.minor), str(sys.version_info.micro))) - versiontext_string = f'''\ - jc version: {info.version} - python interpreter version: {py_ver} - python path: {sys.executable} + if not self.mono and not self.piped_out: + # set colors + class JcStyle(Style): + styles = self.set_env_colors() - {info.website} - {info.copyright} - ''' - return textwrap.dedent(versiontext_string) + return str(highlight(y_string, YamlLexer(), Terminal256Formatter(style=JcStyle))[0:-1]) + return y_string -def yaml_out(data, pretty=False, env_colors=None, mono=False, piped_out=False, ascii_only=False): - """ - Return a YAML formatted string. String may include color codes. If the - YAML library is not installed, output will fall back to JSON with a - warning message to STDERR""" - # make ruamel.yaml import optional - try: - from ruamel.yaml import YAML, representer - YAML_INSTALLED = True - except Exception: - YAML_INSTALLED = False + utils.warning_message(['YAML Library not installed. Reverting to JSON output.']) + return self.json_out() - if YAML_INSTALLED: - y_string_buf = io.BytesIO() + def json_out(self): + """ + Return a JSON formatted string. String may include color codes or be + pretty printed. + """ + import json - # monkey patch to disable plugins since we don't use them and in - # ruamel.yaml versions prior to 0.17.0 the use of __file__ in the - # plugin code is incompatible with the pyoxidizer packager - YAML.official_plug_ins = lambda a: [] + if self.pretty: + self.json_indent = 2 + self.json_separators = None - # monkey patch to disable aliases - representer.RoundTripRepresenter.ignore_aliases = lambda x, y: True + j_string = json.dumps(self.data_out, + indent=self.json_indent, + separators=self.json_separators, + ensure_ascii=self.ascii_only) - yaml = YAML() - yaml.default_flow_style = False - yaml.explicit_start = True - yaml.allow_unicode = not ascii_only - yaml.encoding = 'utf-8' - yaml.dump(data, y_string_buf) - y_string = y_string_buf.getvalue().decode('utf-8')[:-1] - - if not mono and not piped_out: + if not self.mono and not self.piped_out: # set colors class JcStyle(Style): - styles = set_env_colors(env_colors) + styles = self.set_env_colors() - return str(highlight(y_string, YamlLexer(), Terminal256Formatter(style=JcStyle))[0:-1]) + return str(highlight(j_string, JsonLexer(), Terminal256Formatter(style=JcStyle))[0:-1]) - return y_string + return j_string - utils.warning_message(['YAML Library not installed. Reverting to JSON output.']) - return json_out(data, pretty=pretty, env_colors=env_colors, mono=mono, piped_out=piped_out, ascii_only=ascii_only) + def safe_print_out(self): + """Safely prints JSON or YAML output in both UTF-8 and ASCII systems""" + if self.yaml_output: + try: + print(self.yaml_out(), flush=self.flush) + except UnicodeEncodeError: + self.ascii_only = True + print(self.yaml_out(), flush=self.flush) + else: + try: + print(self.json_out(), flush=self.flush) + except UnicodeEncodeError: + self.ascii_only = True + print(self.json_out(), flush=self.flush) -def json_out(data, pretty=False, env_colors=None, mono=False, piped_out=False, ascii_only=False): - """ - Return a JSON formatted string. String may include color codes or be - pretty printed. - """ - import json + def magic_parser(self): + """ + Parse command arguments for magic syntax: jc -p ls -al - separators = (',', ':') - indent = None + Return a tuple: + valid_command (bool) is this a valid cmd? (exists in magic dict) + run_command (list) list of the user's cmd to run. None if no cmd. + jc_parser (str) parser to use for this user's cmd. + jc_options (list) list of jc options + """ + # bail immediately if there are no args or a parser is defined + if len(self.args) <= 1 or (self.args[1].startswith('--') and self.args[1] not in long_options_map): + pass - if pretty: - separators = None - indent = 2 + args_given = self.args[1:] - j_string = json.dumps(data, indent=indent, separators=separators, ensure_ascii=ascii_only) + # find the options + for arg in list(args_given): + # long option found - populate option list + if arg in long_options_map: + self.magic_options.extend(long_options_map[arg][0]) + args_given.pop(0) + continue - if not mono and not piped_out: - # set colors - class JcStyle(Style): - styles = set_env_colors(env_colors) + # parser found - use standard syntax + if arg.startswith('--'): + return False, None, None, [] - return str(highlight(j_string, JsonLexer(), Terminal256Formatter(style=JcStyle))[0:-1]) + # option found - populate option list + if arg.startswith('-'): + self.magic_options.extend(args_given.pop(0)[1:]) - return j_string + # command found if iterator didn't already stop - stop iterating + else: + break - -def safe_print_out(list_or_dict, pretty=None, env_colors=None, mono=None, - piped_out=None, flush=None, yaml=None): - """Safely prints JSON or YAML output in both UTF-8 and ASCII systems""" - if yaml: - try: - print(yaml_out(list_or_dict, - pretty=pretty, - env_colors=env_colors, - mono=mono, - piped_out=piped_out), - flush=flush) - except UnicodeEncodeError: - print(yaml_out(list_or_dict, - pretty=pretty, - env_colors=env_colors, - mono=mono, - piped_out=piped_out, - ascii_only=True), - flush=flush) - - else: - try: - print(json_out(list_or_dict, - pretty=pretty, - env_colors=env_colors, - mono=mono, - piped_out=piped_out), - flush=flush) - except UnicodeEncodeError: - print(json_out(list_or_dict, - pretty=pretty, - env_colors=env_colors, - mono=mono, - piped_out=piped_out, - ascii_only=True), - flush=flush) - - -def magic_parser(args): - """ - Parse command arguments for magic syntax: jc -p ls -al - - Return a tuple: - valid_command (bool) is this a valid cmd? (exists in magic dict) - run_command (list) list of the user's cmd to run. None if no cmd. - jc_parser (str) parser to use for this user's cmd. - jc_options (list) list of jc options - """ - # bail immediately if there are no args or a parser is defined - if len(args) <= 1 or (args[1].startswith('--') and args[1] not in long_options_map): - return False, None, None, [] - - args_given = args[1:] - options = [] - - # find the options - for arg in list(args_given): - # long option found - populate option list - if arg in long_options_map: - options.extend(long_options_map[arg][0]) - args_given.pop(0) - continue - - # parser found - use standard syntax - if arg.startswith('--'): + # if -h, -a, or -v found in options, then bail out + if 'h' in self.magic_options or 'a' in self.magic_options or 'v' in self.magic_options: return False, None, None, [] - # option found - populate option list - if arg.startswith('-'): - options.extend(args_given.pop(0)[1:]) + # all options popped and no command found - for case like 'jc -x' + if len(args_given) == 0: + return False, None, None, [] + + # create a dictionary of magic_commands to their respective parsers. + magic_dict = {} + for entry in all_parser_info(): + magic_dict.update({mc: entry['argument'] for mc in entry.get('magic_commands', [])}) + + # find the command and parser + one_word_command = args_given[0] + two_word_command = ' '.join(args_given[0:2]) + + # try to get a parser for two_word_command, otherwise get one for one_word_command + self.magic_found_parser = magic_dict.get(two_word_command, magic_dict.get(one_word_command)) + + # set the instance variables + self.valid_command = bool(self.magic_found_parser) + self.run_command = args_given + + def open_text_file(self): + with open(self.path_string, 'r') as f: + return f.read() + + def run_user_command(self): + """ + Use subprocess to run the user's command. Returns the STDOUT, STDERR, + and the Exit Code as a tuple. + """ + proc = subprocess.Popen(self.command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=False, # Allows inheriting file descriptors; + universal_newlines=True, # useful for process substitution + encoding='UTF-8') + self.magic_stdout, self.magic_stderr = proc.communicate() + + self.magic_stdout = self.magic_stdout or '\n' + self.magic_returncode = proc.returncode + + def combined_exit_code(self): + self.exit_code = self.program_exit + self.jc_exit + self.exit_code = min(self.exit_code, 255) + + def add_metadata_to_output(self): + """ + This function mutates a list or dict in place. If the _jc_meta field + does not already exist, it will be created with the metadata fields. If + the _jc_meta field already exists, the metadata fields will be added to + the existing object. + + In the case of an empty list (no data), a dictionary with a _jc_meta + object will be added to the list. This way you always get metadata, + even if there are no results. + """ + self.run_timestamp = self.runtime.timestamp() + + meta_obj = { + 'parser': self.parser_name, + 'timestamp': self.run_timestamp + } + + if self.run_command: + meta_obj['magic_command'] = self.run_command + meta_obj['magic_command_exit'] = self.magic_exit_code + + if isinstance(self.data_out, dict): + if '_jc_meta' not in self.data_out: + self.data_out['_jc_meta'] = {} + + self.data_out['_jc_meta'].update(meta_obj) + + elif isinstance(self.data_out, list): + if not self.data_out: + self.data_out.append({}) + + for item in self.data_out: + if '_jc_meta' not in item: + item['_jc_meta'] = {} + + item['_jc_meta'].update(meta_obj) - # command found if iterator didn't already stop - stop iterating else: - break + utils.error_message(['Parser returned an unsupported object type.']) + self.jc_error = self.JC_ERROR_EXIT + sys.exit(self.combined_exit_code()) - # if -h, -a, or -v found in options, then bail out - if 'h' in options or 'a' in options or 'v' in options: - return False, None, None, [] - - # all options popped and no command found - for case like 'jc -x' - if len(args_given) == 0: - return False, None, None, [] - - # create a dictionary of magic_commands to their respective parsers. - magic_dict = {} - for entry in all_parser_info(): - magic_dict.update({mc: entry['argument'] for mc in entry.get('magic_commands', [])}) - - # find the command and parser - one_word_command = args_given[0] - two_word_command = ' '.join(args_given[0:2]) - - # try to get a parser for two_word_command, otherwise get one for one_word_command - found_parser = magic_dict.get(two_word_command, magic_dict.get(one_word_command)) - - return ( - bool(found_parser), # was a suitable parser found? - args_given, # run_command - found_parser, # the parser selected - options # jc options to preserve - ) - - -def open_text_file(path_string): - with open(path_string, 'r') as f: - return f.read() - - -def run_user_command(command): - """ - Use subprocess to run the user's command. Returns the STDOUT, STDERR, - and the Exit Code as a tuple. - """ - proc = subprocess.Popen(command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=False, # Allows inheriting file descriptors; - universal_newlines=True, # useful for process substitution - encoding='UTF-8') - stdout, stderr = proc.communicate() - - return ( - stdout or '\n', - stderr, - proc.returncode - ) - - -def combined_exit_code(program_exit=0, jc_exit=0): - exit_code = program_exit + jc_exit - exit_code = min(exit_code, 255) - return exit_code - - -def add_metadata_to(list_or_dict, - runtime=None, - run_command=None, - magic_exit_code=None, - parser_name=None): - """ - This function mutates a list or dict in place. If the _jc_meta field - does not already exist, it will be created with the metadata fields. If - the _jc_meta field already exists, the metadata fields will be added to - the existing object. - - In the case of an empty list (no data), a dictionary with a _jc_meta - object will be added to the list. This way you always get metadata, - even if there are no results. - """ - run_timestamp = runtime.timestamp() - - meta_obj = { - 'parser': parser_name, - 'timestamp': run_timestamp - } - - if run_command: - meta_obj['magic_command'] = run_command - meta_obj['magic_command_exit'] = magic_exit_code - - if isinstance(list_or_dict, dict): - if '_jc_meta' not in list_or_dict: - list_or_dict['_jc_meta'] = {} - - list_or_dict['_jc_meta'].update(meta_obj) - - elif isinstance(list_or_dict, list): - if not list_or_dict: - list_or_dict.append({}) - - for item in list_or_dict: - if '_jc_meta' not in item: - item['_jc_meta'] = {} - - item['_jc_meta'].update(meta_obj) - - else: - utils.error_message(['Parser returned an unsupported object type.']) - sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + def ctrlc(self, signum, frame): + """Exit with error on SIGINT""" + sys.exit(self.JC_ERROR_EXIT) def main(): + cli = JcCli() + # break on ctrl-c keyboard interrupt - signal.signal(signal.SIGINT, ctrlc) + signal.signal(signal.SIGINT, cli.ctrlc) # break on pipe error. need try/except for windows compatibility try: @@ -527,240 +546,233 @@ def main(): os.system('') # parse magic syntax first: e.g. jc -p ls -al - magic_options = [] - valid_command, run_command, magic_found_parser, magic_options = magic_parser(sys.argv) + cli.args = sys.argv + cli.magic_parser() # set colors - jc_colors = os.getenv('JC_COLORS') + cli.env_colors = os.getenv('JC_COLORS') # set options - options = [] - options.extend(magic_options) + cli.options.extend(cli.magic_options) # find options if magic_parser did not find a command - if not valid_command: - for opt in sys.argv: + if not cli.valid_command: + for opt in cli.args: if opt in long_options_map: - options.extend(long_options_map[opt][0]) + cli.options.extend(long_options_map[opt][0]) if opt.startswith('-') and not opt.startswith('--'): - options.extend(opt[1:]) + cli.options.extend(opt[1:]) - about = 'a' in options - debug = 'd' in options - verbose_debug = options.count('d') > 1 - force_color = 'C' in options - mono = ('m' in options or bool(os.getenv('NO_COLOR'))) and not force_color - help_me = 'h' in options - verbose_help = options.count('h') > 1 - pretty = 'p' in options - quiet = 'q' in options - ignore_exceptions = options.count('q') > 1 - raw = 'r' in options - meta_out = 'M' in options - unbuffer = 'u' in options - version_info = 'v' in options - yaml_out = 'y' in options - bash_comp = 'B' in options - zsh_comp = 'Z' in options + cli.about = 'a' in cli.options + cli.debug = 'd' in cli.options + cli.verbose_debug = cli.options.count('d') > 1 + cli.force_color = 'C' in cli.options + cli.mono = ('m' in cli.options or bool(os.getenv('NO_COLOR'))) and not cli.force_color + cli.help_me = 'h' in cli.options + cli.verbose_help = cli.options.count('h') > 1 + cli.pretty = 'p' in cli.options + cli.quiet = 'q' in cli.options + cli.ignore_exceptions = cli.options.count('q') > 1 + cli.raw = 'r' in cli.options + cli.meta_out = 'M' in cli.options + cli.unbuffer = 'u' in cli.options + cli.version_info = 'v' in cli.options + cli.yaml_output = 'y' in cli.options + cli.bash_comp = 'B' in cli.options + cli.zsh_comp = 'Z' in cli.options - if verbose_debug: + if cli.verbose_debug: tracebackplus.enable(context=11) if not PYGMENTS_INSTALLED: - mono = True + cli.mono = True - if about: - safe_print_out(about_jc(), - pretty=pretty, - env_colors=jc_colors, - mono=mono, - piped_out=piped_output(force_color), - yaml=yaml_out) + if cli.about: + cli.data_out = cli.about_jc() + cli.safe_print_out() sys.exit(0) - if help_me: - help_doc(sys.argv, show_hidden=verbose_help) + if cli.help_me: + cli.help_doc() sys.exit(0) - if version_info: - utils._safe_print(versiontext()) + if cli.version_info: + utils._safe_print(cli.versiontext()) sys.exit(0) - if bash_comp: + if cli.bash_comp: utils._safe_print(bash_completion()) sys.exit(0) - if zsh_comp: + if cli.zsh_comp: utils._safe_print(zsh_completion()) sys.exit(0) # if magic syntax used, try to run the command and error if it's not found, etc. - magic_stdout, magic_stderr, magic_exit_code = None, None, 0 - run_command_str = '' - if run_command: + if cli.run_command: try: - run_command_str = shlex.join(run_command) # python 3.8+ + cli.run_command_str = shlex.join(cli.run_command) # python 3.8+ except AttributeError: - run_command_str = ' '.join(run_command) # older python versions + cli.run_command_str = ' '.join(cli.run_command) # older python versions - if run_command_str.startswith('/proc'): + if cli.run_command_str.startswith('/proc'): try: - magic_found_parser = 'proc' - magic_stdout = open_text_file(run_command_str) + cli.magic_found_parser = 'proc' + cli.stdout = cli.open_text_file() except OSError as e: - if debug: + if cli.debug: raise error_msg = os.strerror(e.errno) utils.error_message([ - f'"{run_command_str}" file could not be opened: {error_msg}.' + f'"{cli.run_command_str}" file could not be opened: {error_msg}.' ]) - sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + cli.jc_error = cli.JC_ERROR_EXIT + sys.exit(cli.combined_exit_code()) except Exception: - if debug: + if cli.debug: raise utils.error_message([ - f'"{run_command_str}" file could not be opened. For details use the -d or -dd option.' + f'"{cli.run_command_str}" file could not be opened. For details use the -d or -dd option.' ]) - sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + cli.jc_error = cli.JC_ERROR_EXIT + sys.exit(cli.combined_exit_code()) - elif valid_command: + elif cli.valid_command: try: - magic_stdout, magic_stderr, magic_exit_code = run_user_command(run_command) - if magic_stderr: - utils._safe_print(magic_stderr[:-1], file=sys.stderr) + cli.run_user_command() + if cli.magic_stderr: + utils._safe_print(cli.magic_stderr[:-1], file=sys.stderr) except OSError as e: - if debug: + if cli.debug: raise error_msg = os.strerror(e.errno) utils.error_message([ - f'"{run_command_str}" command could not be run: {error_msg}.' + f'"{cli.run_command_str}" command could not be run: {error_msg}.' ]) - sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + cli.jc_error = cli.JC_ERROR_EXIT + sys.exit(cli.combined_exit_code()) except Exception: - if debug: + if cli.debug: raise utils.error_message([ - f'"{run_command_str}" command could not be run. For details use the -d or -dd option.' + f'"{cli.run_command_str}" command could not be run. For details use the -d or -dd option.' ]) - sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + cli.jc_error = cli.JC_ERROR_EXIT + sys.exit(cli.combined_exit_code()) - elif run_command is not None: - utils.error_message([f'"{run_command_str}" cannot be used with Magic syntax. Use "jc -h" for help.']) - sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + elif cli.run_command is not None: + utils.error_message([f'"{cli.run_command_str}" cannot be used with Magic syntax. Use "jc -h" for help.']) + sys.exit(cli.combined_exit_code()) # find the correct parser - if magic_found_parser: - parser = _get_parser(magic_found_parser) - parser_name = parser_shortname(magic_found_parser) + if cli.magic_found_parser: + cli.parser_module = _get_parser(cli.magic_found_parser) + cli.parser_name = cli.parser_shortname() else: found = False - for arg in sys.argv: - parser_name = parser_shortname(arg) + for arg in cli.args: + cli.parser_arg = arg + cli.parser_name = cli.parser_shortname() - if parser_name in parsers: - parser = _get_parser(arg) + if cli.parser_name in parsers: + cli.parser_module = _get_parser(cli.parser_arg) found = True break if not found: utils.error_message(['Missing or incorrect arguments. Use "jc -h" for help.']) - sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + cli.jc_error = cli.JC_ERROR_EXIT + sys.exit(cli.combined_exit_code()) - if sys.stdin.isatty() and magic_stdout is None: + if sys.stdin.isatty() and cli.stdout is None: utils.error_message(['Missing piped data. Use "jc -h" for help.']) - sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + cli.jc_error = cli.JC_ERROR_EXIT + sys.exit(cli.combined_exit_code()) # parse and print to stdout try: # differentiate between regular and streaming parsers # streaming (only supports UTF-8 string data for now) - if _parser_is_streaming(parser): - result = parser.parse(sys.stdin, - raw=raw, - quiet=quiet, - ignore_exceptions=ignore_exceptions) + if _parser_is_streaming(cli.parser_module): + cli.data_in = sys.stdin + result = cli.parser_module.parse(cli.data_in, + raw=cli.raw, + quiet=cli.quiet, + ignore_exceptions=cli.ignore_exceptions) for line in result: - if meta_out: - run_dt_utc = datetime.now(timezone.utc) - add_metadata_to(line, run_dt_utc, run_command, magic_exit_code, parser_name) + cli.data_out = line + if cli.meta_out: + cli.run_timestamp = datetime.now(timezone.utc) + cli.add_metadata_to_output() - safe_print_out(line, - pretty=pretty, - env_colors=jc_colors, - mono=mono, - piped_out=piped_output(force_color), - flush=unbuffer, - yaml=yaml_out) + cli.safe_print_out() - sys.exit(combined_exit_code(magic_exit_code, 0)) + sys.exit(cli.combined_exit_code()) # regular (supports binary and UTF-8 string data) else: - data = magic_stdout or sys.stdin.buffer.read() + print(cli.stdout) + cli.data_in = cli.stdout or sys.stdin.buffer.read() # convert to UTF-8, if possible. Otherwise, leave as bytes try: - if isinstance(data, bytes): - data = data.decode('utf-8') + if isinstance(cli.data_in, bytes): + cli.data_in = cli.data_in.decode('utf-8') except UnicodeDecodeError: pass - result = parser.parse(data, - raw=raw, - quiet=quiet) + cli.data_out = cli.parser_module.parse(cli.data_in, + raw=cli.raw, + quiet=cli.quiet) - if meta_out: - run_dt_utc = datetime.now(timezone.utc) - add_metadata_to(result, run_dt_utc, run_command, magic_exit_code, parser_name) + if cli.meta_out: + cli.run_timestamp = datetime.now(timezone.utc) + cli.add_metadata_to_output() - safe_print_out(result, - pretty=pretty, - env_colors=jc_colors, - mono=mono, - piped_out=piped_output(force_color), - flush=unbuffer, - yaml=yaml_out) + cli.safe_print_out() - sys.exit(combined_exit_code(magic_exit_code, 0)) + sys.exit(cli.combined_exit_code()) except (ParseError, LibraryNotInstalled) as e: - if debug: + if cli.debug: raise utils.error_message([ - f'Parser issue with {parser_name}:', f'{e.__class__.__name__}: {e}', + f'Parser issue with {cli.parser_name}:', f'{e.__class__.__name__}: {e}', 'If this is the correct parser, try setting the locale to C (LC_ALL=C).', - f'For details use the -d or -dd option. Use "jc -h --{parser_name}" for help.' + f'For details use the -d or -dd option. Use "jc -h --{cli.parser_name}" for help.' ]) - sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + cli.jc_error = cli.JC_ERROR_EXIT + sys.exit(cli.combined_exit_code()) except Exception: - if debug: + if cli.debug: raise streaming_msg = '' - if _parser_is_streaming(parser): + if _parser_is_streaming(cli.parser_module): streaming_msg = 'Use the -qq option to ignore streaming parser errors.' utils.error_message([ - f'{parser_name} parser could not parse the input data.', + f'{cli.parser_name} parser could not parse the input data.', f'{streaming_msg}', 'If this is the correct parser, try setting the locale to C (LC_ALL=C).', - f'For details use the -d or -dd option. Use "jc -h --{parser_name}" for help.' + f'For details use the -d or -dd option. Use "jc -h --{cli.parser_name}" for help.' ]) - sys.exit(combined_exit_code(magic_exit_code, JC_ERROR_EXIT)) + cli.jc_error = cli.JC_ERROR_EXIT + sys.exit(cli.combined_exit_code()) if __name__ == '__main__':