From 557afc95bd496647a7128556152f696576715445 Mon Sep 17 00:00:00 2001 From: Kelly Brazil Date: Sun, 2 Oct 2022 18:20:14 -0700 Subject: [PATCH] initial working --- jc/cli.py | 536 +++++++++++++++++++++++++++--------------------------- 1 file changed, 269 insertions(+), 267 deletions(-) diff --git a/jc/cli.py b/jc/cli.py index 05759bc1..94502843 100644 --- a/jc/cli.py +++ b/jc/cli.py @@ -61,12 +61,12 @@ class JcCli(): self.current_out_line = None self.options: List[str] = [] self.args: List[str] = [] - self.parser_arg = None self.parser_module = None self.parser_name = None self.indent = 0 self.pad = 0 - self.env_colors: Dict = {} + self.env_colors = None + self.custom_colors: Dict = {} self.documentation = False self.show_hidden = False self.piped_out = False @@ -75,9 +75,6 @@ class JcCli(): self.json_separators = (',', ':') self.json_indent = None self.path_string = None - self.run_command_str = '' - self.command = None - self.program_exit = 0 self.jc_exit = 0 self.JC_ERROR_EXIT = 100 self.exit_code = 0 @@ -106,14 +103,15 @@ class JcCli(): # Magic options self.valid_command = False self.run_command = None + self.run_command_str = '' self.magic_found_parser = None self.magic_options: List[str] = [] - self.magic_stdout = None, - self.magic_stderr = None, + self.magic_stdout = None + self.magic_stderr = None self.magic_returncode = None - def set_env_colors(self): + def set_custom_colors(self): """ Return a dictionary to be used in Pygments custom style class. @@ -153,24 +151,26 @@ class JcCli(): color_list = ['default', 'default', 'default', 'default'] # Try the color set in the JC_COLORS env variable first. If it is set to default, then fall back to default colors - self.set_env_colors = { + self.custom_colors = { Name.Tag: f'bold {PYGMENT_COLOR[color_list[0]]}' if color_list[0] != 'default' else f"bold {PYGMENT_COLOR['blue']}", # key names Keyword: PYGMENT_COLOR[color_list[1]] if color_list[1] != 'default' else PYGMENT_COLOR['brightblack'], # true, false, null Number: PYGMENT_COLOR[color_list[2]] if color_list[2] != 'default' else PYGMENT_COLOR['magenta'], # numbers String: PYGMENT_COLOR[color_list[3]] if color_list[3] != 'default' else PYGMENT_COLOR['green'] # strings } - def piped_output(self): + def set_piped_out(self): """ Return False if `STDOUT` is a TTY. True if output is being piped to another program and foce_color is True. This allows forcing of ANSI color codes even when using pipes. """ - return not sys.stdout.isatty() and not self.force_color + if not sys.stdout.isatty() and not self.force_color: + self.piped_out = True - def parser_shortname(self): + @staticmethod + def parser_shortname(parser_arg): """Return short name of the parser with dashes and no -- prefix""" - return self.parser_arg[2:] + return parser_arg[2:] def parsers_text(self): """Return the argument and description information from each parser""" @@ -276,8 +276,7 @@ Examples: otherwise the general help text is returned. """ for arg in self.options: - self.parser_arg = arg - parser_name = self.parser_shortname() + parser_name = self.parser_shortname(arg) if parser_name in parsers: self.documentation = True @@ -347,7 +346,7 @@ Examples: if not self.mono and not self.piped_out: # set colors class JcStyle(Style): - styles = self.set_env_colors() + styles = self.custom_colors return str(highlight(y_string, YamlLexer(), Terminal256Formatter(style=JcStyle))[0:-1]) @@ -375,7 +374,7 @@ Examples: if not self.mono and not self.piped_out: # set colors class JcStyle(Style): - styles = self.set_env_colors() + styles = self.custom_colors return str(highlight(j_string, JsonLexer(), Terminal256Formatter(style=JcStyle))[0:-1]) @@ -409,7 +408,7 @@ Examples: """ # bail immediately if there are no args or a parser is defined if len(self.args) <= 1 or (self.args[1].startswith('--') and self.args[1] not in long_options_map): - pass + return args_given = self.args[1:] @@ -423,7 +422,7 @@ Examples: # parser found - use standard syntax if arg.startswith('--'): - return False, None, None, [] + return # option found - populate option list if arg.startswith('-'): @@ -435,11 +434,11 @@ Examples: # if -h, -a, or -v found in options, then bail out if 'h' in self.magic_options or 'a' in self.magic_options or 'v' in self.magic_options: - return False, None, None, [] + return # all options popped and no command found - for case like 'jc -x' if len(args_given) == 0: - return False, None, None, [] + return # create a dictionary of magic_commands to their respective parsers. magic_dict = {} @@ -466,7 +465,7 @@ Examples: Use subprocess to run the user's command. Returns the STDOUT, STDERR, and the Exit Code as a tuple. """ - proc = subprocess.Popen(self.command, + proc = subprocess.Popen(self.run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=False, # Allows inheriting file descriptors; @@ -478,7 +477,7 @@ Examples: self.magic_returncode = proc.returncode def combined_exit_code(self): - self.exit_code = self.program_exit + self.jc_exit + self.exit_code = self.magic_returncode + self.jc_exit self.exit_code = min(self.exit_code, 255) def add_metadata_to_output(self): @@ -501,7 +500,7 @@ Examples: if self.run_command: meta_obj['magic_command'] = self.run_command - meta_obj['magic_command_exit'] = self.magic_exit_code + meta_obj['magic_command_exit'] = self.magic_returncode if isinstance(self.data_out, dict): if '_jc_meta' not in self.data_out: @@ -528,251 +527,254 @@ Examples: """Exit with error on SIGINT""" sys.exit(self.JC_ERROR_EXIT) + def run(self): + + # break on ctrl-c keyboard interrupt + signal.signal(signal.SIGINT, self.ctrlc) + + # break on pipe error. need try/except for windows compatibility + try: + signal.signal(signal.SIGPIPE, signal.SIG_DFL) + except AttributeError: + pass + + # enable colors for Windows cmd.exe terminal + if sys.platform.startswith('win32'): + os.system('') + + # parse magic syntax first: e.g. jc -p ls -al + self.args = sys.argv + self.magic_parser() + + # set colors + self.env_colors = os.getenv('JC_COLORS') + + # set options + self.options.extend(self.magic_options) + + # find options if magic_parser did not find a command + if not self.valid_command: + for opt in self.args: + if opt in long_options_map: + self.options.extend(long_options_map[opt][0]) + + if opt.startswith('-') and not opt.startswith('--'): + self.options.extend(opt[1:]) + + self.about = 'a' in self.options + self.debug = 'd' in self.options + self.verbose_debug = self.options.count('d') > 1 + self.force_color = 'C' in self.options + self.mono = ('m' in self.options or bool(os.getenv('NO_COLOR'))) and not self.force_color + self.help_me = 'h' in self.options + self.verbose_help = self.options.count('h') > 1 + self.pretty = 'p' in self.options + self.quiet = 'q' in self.options + self.ignore_exceptions = self.options.count('q') > 1 + self.raw = 'r' in self.options + self.meta_out = 'M' in self.options + self.unbuffer = 'u' in self.options + self.version_info = 'v' in self.options + self.yaml_output = 'y' in self.options + self.bash_comp = 'B' in self.options + self.zsh_comp = 'Z' in self.options + + self.set_piped_out() + self.set_custom_colors() + + if self.verbose_debug: + tracebackplus.enable(context=11) + + if not PYGMENTS_INSTALLED: + self.mono = True + + if self.about: + self.data_out = self.about_jc() + self.safe_print_out() + sys.exit(0) + + if self.help_me: + self.help_doc() + sys.exit(0) + + if self.version_info: + utils._safe_print(self.versiontext()) + sys.exit(0) + + if self.bash_comp: + utils._safe_print(bash_completion()) + sys.exit(0) + + if self.zsh_comp: + utils._safe_print(zsh_completion()) + sys.exit(0) + + # if magic syntax used, try to run the command and error if it's not found, etc. + if self.run_command: + try: + self.run_command_str = shlex.join(self.run_command) # python 3.8+ + except AttributeError: + self.run_command_str = ' '.join(self.run_command) # older python versions + + if self.run_command_str.startswith('/proc'): + try: + self.magic_found_parser = 'proc' + self.magic_stdout = self.open_text_file() + + except OSError as e: + if self.debug: + raise + + error_msg = os.strerror(e.errno) + utils.error_message([ + f'"{self.run_command_str}" file could not be opened: {error_msg}.' + ]) + self.jc_error = self.JC_ERROR_EXIT + sys.exit(self.combined_exit_code()) + + except Exception: + if self.debug: + raise + + utils.error_message([ + f'"{self.run_command_str}" file could not be opened. For details use the -d or -dd option.' + ]) + self.jc_error = self.JC_ERROR_EXIT + sys.exit(self.combined_exit_code()) + + elif self.valid_command: + try: + self.run_user_command() + if self.magic_stderr: + utils._safe_print(self.magic_stderr[:-1], file=sys.stderr) + + except OSError as e: + if self.debug: + raise + + error_msg = os.strerror(e.errno) + utils.error_message([ + f'"{self.run_command_str}" command could not be run: {error_msg}.' + ]) + self.jc_error = self.JC_ERROR_EXIT + sys.exit(self.combined_exit_code()) + + except Exception: + if self.debug: + raise + + utils.error_message([ + f'"{self.run_command_str}" command could not be run. For details use the -d or -dd option.' + ]) + self.jc_error = self.JC_ERROR_EXIT + sys.exit(self.combined_exit_code()) + + elif self.run_command is not None: + utils.error_message([f'"{self.run_command_str}" cannot be used with Magic syntax. Use "jc -h" for help.']) + sys.exit(self.combined_exit_code()) + + # find the correct parser + if self.magic_found_parser: + self.parser_module = _get_parser(self.magic_found_parser) + self.parser_name = self.parser_shortname(self.magic_found_parser) + + else: + found = False + for arg in self.args: + self.parser_name = self.parser_shortname(arg) + + if self.parser_name in parsers: + self.parser_module = _get_parser(arg) + found = True + break + + if not found: + utils.error_message(['Missing or incorrect arguments. Use "jc -h" for help.']) + self.jc_error = self.JC_ERROR_EXIT + sys.exit(self.combined_exit_code()) + + if sys.stdin.isatty() and self.magic_stdout is None: + utils.error_message(['Missing piped data. Use "jc -h" for help.']) + self.jc_error = self.JC_ERROR_EXIT + sys.exit(self.combined_exit_code()) + + # parse and print to stdout + try: + # differentiate between regular and streaming parsers + + # streaming (only supports UTF-8 string data for now) + if _parser_is_streaming(self.parser_module): + self.data_in = sys.stdin + result = self.parser_module.parse(self.data_in, + raw=self.raw, + quiet=self.quiet, + ignore_exceptions=self.ignore_exceptions) + + for line in result: + self.data_out = line + if self.meta_out: + self.run_timestamp = datetime.now(timezone.utc) + self.add_metadata_to_output() + + self.safe_print_out() + + sys.exit(self.combined_exit_code()) + + # regular (supports binary and UTF-8 string data) + else: + self.data_in = self.magic_stdout or sys.stdin.buffer.read() + + # convert to UTF-8, if possible. Otherwise, leave as bytes + try: + if isinstance(self.data_in, bytes): + self.data_in = self.data_in.decode('utf-8') + except UnicodeDecodeError: + pass + + self.data_out = self.parser_module.parse(self.data_in, + raw=self.raw, + quiet=self.quiet) + + if self.meta_out: + self.run_timestamp = datetime.now(timezone.utc) + self.add_metadata_to_output() + + self.safe_print_out() + + sys.exit(self.combined_exit_code()) + + except (ParseError, LibraryNotInstalled) as e: + if self.debug: + raise + + utils.error_message([ + f'Parser issue with {self.parser_name}:', f'{e.__class__.__name__}: {e}', + 'If this is the correct parser, try setting the locale to C (LC_ALL=C).', + f'For details use the -d or -dd option. Use "jc -h --{self.parser_name}" for help.' + ]) + self.jc_error = self.JC_ERROR_EXIT + sys.exit(self.combined_exit_code()) + + except Exception: + if self.debug: + raise + + streaming_msg = '' + if _parser_is_streaming(self.parser_module): + streaming_msg = 'Use the -qq option to ignore streaming parser errors.' + + utils.error_message([ + f'{self.parser_name} parser could not parse the input data.', + f'{streaming_msg}', + 'If this is the correct parser, try setting the locale to C (LC_ALL=C).', + f'For details use the -d or -dd option. Use "jc -h --{self.parser_name}" for help.' + ]) + self.jc_error = self.JC_ERROR_EXIT + sys.exit(self.combined_exit_code()) + def main(): - cli = JcCli() - - # break on ctrl-c keyboard interrupt - signal.signal(signal.SIGINT, cli.ctrlc) - - # break on pipe error. need try/except for windows compatibility - try: - signal.signal(signal.SIGPIPE, signal.SIG_DFL) - except AttributeError: - pass - - # enable colors for Windows cmd.exe terminal - if sys.platform.startswith('win32'): - os.system('') - - # parse magic syntax first: e.g. jc -p ls -al - cli.args = sys.argv - cli.magic_parser() - - # set colors - cli.env_colors = os.getenv('JC_COLORS') - - # set options - cli.options.extend(cli.magic_options) - - # find options if magic_parser did not find a command - if not cli.valid_command: - for opt in cli.args: - if opt in long_options_map: - cli.options.extend(long_options_map[opt][0]) - - if opt.startswith('-') and not opt.startswith('--'): - cli.options.extend(opt[1:]) - - cli.about = 'a' in cli.options - cli.debug = 'd' in cli.options - cli.verbose_debug = cli.options.count('d') > 1 - cli.force_color = 'C' in cli.options - cli.mono = ('m' in cli.options or bool(os.getenv('NO_COLOR'))) and not cli.force_color - cli.help_me = 'h' in cli.options - cli.verbose_help = cli.options.count('h') > 1 - cli.pretty = 'p' in cli.options - cli.quiet = 'q' in cli.options - cli.ignore_exceptions = cli.options.count('q') > 1 - cli.raw = 'r' in cli.options - cli.meta_out = 'M' in cli.options - cli.unbuffer = 'u' in cli.options - cli.version_info = 'v' in cli.options - cli.yaml_output = 'y' in cli.options - cli.bash_comp = 'B' in cli.options - cli.zsh_comp = 'Z' in cli.options - - if cli.verbose_debug: - tracebackplus.enable(context=11) - - if not PYGMENTS_INSTALLED: - cli.mono = True - - if cli.about: - cli.data_out = cli.about_jc() - cli.safe_print_out() - sys.exit(0) - - if cli.help_me: - cli.help_doc() - sys.exit(0) - - if cli.version_info: - utils._safe_print(cli.versiontext()) - sys.exit(0) - - if cli.bash_comp: - utils._safe_print(bash_completion()) - sys.exit(0) - - if cli.zsh_comp: - utils._safe_print(zsh_completion()) - sys.exit(0) - - # if magic syntax used, try to run the command and error if it's not found, etc. - if cli.run_command: - try: - cli.run_command_str = shlex.join(cli.run_command) # python 3.8+ - except AttributeError: - cli.run_command_str = ' '.join(cli.run_command) # older python versions - - if cli.run_command_str.startswith('/proc'): - try: - cli.magic_found_parser = 'proc' - cli.stdout = cli.open_text_file() - - except OSError as e: - if cli.debug: - raise - - error_msg = os.strerror(e.errno) - utils.error_message([ - f'"{cli.run_command_str}" file could not be opened: {error_msg}.' - ]) - cli.jc_error = cli.JC_ERROR_EXIT - sys.exit(cli.combined_exit_code()) - - except Exception: - if cli.debug: - raise - - utils.error_message([ - f'"{cli.run_command_str}" file could not be opened. For details use the -d or -dd option.' - ]) - cli.jc_error = cli.JC_ERROR_EXIT - sys.exit(cli.combined_exit_code()) - - elif cli.valid_command: - try: - cli.run_user_command() - if cli.magic_stderr: - utils._safe_print(cli.magic_stderr[:-1], file=sys.stderr) - - except OSError as e: - if cli.debug: - raise - - error_msg = os.strerror(e.errno) - utils.error_message([ - f'"{cli.run_command_str}" command could not be run: {error_msg}.' - ]) - cli.jc_error = cli.JC_ERROR_EXIT - sys.exit(cli.combined_exit_code()) - - except Exception: - if cli.debug: - raise - - utils.error_message([ - f'"{cli.run_command_str}" command could not be run. For details use the -d or -dd option.' - ]) - cli.jc_error = cli.JC_ERROR_EXIT - sys.exit(cli.combined_exit_code()) - - elif cli.run_command is not None: - utils.error_message([f'"{cli.run_command_str}" cannot be used with Magic syntax. Use "jc -h" for help.']) - sys.exit(cli.combined_exit_code()) - - # find the correct parser - if cli.magic_found_parser: - cli.parser_module = _get_parser(cli.magic_found_parser) - cli.parser_name = cli.parser_shortname() - - else: - found = False - for arg in cli.args: - cli.parser_arg = arg - cli.parser_name = cli.parser_shortname() - - if cli.parser_name in parsers: - cli.parser_module = _get_parser(cli.parser_arg) - found = True - break - - if not found: - utils.error_message(['Missing or incorrect arguments. Use "jc -h" for help.']) - cli.jc_error = cli.JC_ERROR_EXIT - sys.exit(cli.combined_exit_code()) - - if sys.stdin.isatty() and cli.stdout is None: - utils.error_message(['Missing piped data. Use "jc -h" for help.']) - cli.jc_error = cli.JC_ERROR_EXIT - sys.exit(cli.combined_exit_code()) - - # parse and print to stdout - try: - # differentiate between regular and streaming parsers - - # streaming (only supports UTF-8 string data for now) - if _parser_is_streaming(cli.parser_module): - cli.data_in = sys.stdin - result = cli.parser_module.parse(cli.data_in, - raw=cli.raw, - quiet=cli.quiet, - ignore_exceptions=cli.ignore_exceptions) - - for line in result: - cli.data_out = line - if cli.meta_out: - cli.run_timestamp = datetime.now(timezone.utc) - cli.add_metadata_to_output() - - cli.safe_print_out() - - sys.exit(cli.combined_exit_code()) - - # regular (supports binary and UTF-8 string data) - else: - print(cli.stdout) - cli.data_in = cli.stdout or sys.stdin.buffer.read() - - # convert to UTF-8, if possible. Otherwise, leave as bytes - try: - if isinstance(cli.data_in, bytes): - cli.data_in = cli.data_in.decode('utf-8') - except UnicodeDecodeError: - pass - - cli.data_out = cli.parser_module.parse(cli.data_in, - raw=cli.raw, - quiet=cli.quiet) - - if cli.meta_out: - cli.run_timestamp = datetime.now(timezone.utc) - cli.add_metadata_to_output() - - cli.safe_print_out() - - sys.exit(cli.combined_exit_code()) - - except (ParseError, LibraryNotInstalled) as e: - if cli.debug: - raise - - utils.error_message([ - f'Parser issue with {cli.parser_name}:', f'{e.__class__.__name__}: {e}', - 'If this is the correct parser, try setting the locale to C (LC_ALL=C).', - f'For details use the -d or -dd option. Use "jc -h --{cli.parser_name}" for help.' - ]) - cli.jc_error = cli.JC_ERROR_EXIT - sys.exit(cli.combined_exit_code()) - - except Exception: - if cli.debug: - raise - - streaming_msg = '' - if _parser_is_streaming(cli.parser_module): - streaming_msg = 'Use the -qq option to ignore streaming parser errors.' - - utils.error_message([ - f'{cli.parser_name} parser could not parse the input data.', - f'{streaming_msg}', - 'If this is the correct parser, try setting the locale to C (LC_ALL=C).', - f'For details use the -d or -dd option. Use "jc -h --{cli.parser_name}" for help.' - ]) - cli.jc_error = cli.JC_ERROR_EXIT - sys.exit(cli.combined_exit_code()) + JcCli().run() if __name__ == '__main__':