1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-19 00:17:51 +02:00

add slicer functionality

This commit is contained in:
Kelly Brazil
2023-01-23 13:57:48 -08:00
parent 13ffe8a84d
commit 0c82fe7e4d

172
jc/cli.py
View File

@ -5,11 +5,13 @@ JC cli module
import io
import sys
import os
import re
from itertools import islice
from datetime import datetime, timezone
import textwrap
import shlex
import subprocess
from typing import List, Dict, Union, Optional, TextIO
from typing import List, Dict, Iterable, Union, Optional, TextIO
from types import ModuleType
from .lib import (
__version__, parser_info, all_parser_info, parsers, _get_parser, _parser_is_streaming,
@ -69,11 +71,11 @@ class JcCli():
'help_me', 'pretty', 'quiet', 'ignore_exceptions', 'raw', 'meta_out', 'unbuffer',
'version_info', 'yaml_output', 'bash_comp', 'zsh_comp', 'magic_found_parser',
'magic_options', 'magic_run_command', 'magic_run_command_str', 'magic_stdout',
'magic_stderr', 'magic_returncode'
'magic_stderr', 'magic_returncode', 'slice_start', 'slice_end'
)
def __init__(self) -> None:
self.data_in: Optional[Union[str, bytes, TextIO]] = None
self.data_in: Optional[Union[str, bytes, TextIO, Iterable[str]]] = None
self.data_out: Optional[Union[List[JSONDictType], JSONDictType]] = None
self.options: List[str] = []
self.args: List[str] = []
@ -89,6 +91,10 @@ class JcCli():
self.json_indent: Optional[int] = None
self.run_timestamp: Optional[datetime] = None
# slicer
self.slice_start: Optional[int] = None
self.slice_end: Optional[int] = None
# cli options
self.about: bool = False
self.debug: bool = False
@ -574,59 +580,6 @@ class JcCli():
utils.error_message(['Missing piped data. Use "jc -h" for help.'])
self.exit_error()
def streaming_parse_and_print(self) -> None:
"""only supports UTF-8 string data for now"""
self.data_in = sys.stdin
if self.parser_module:
result = self.parser_module.parse(
self.data_in,
raw=self.raw,
quiet=self.quiet,
ignore_exceptions=self.ignore_exceptions
)
for line in result:
self.data_out = line
if self.meta_out:
self.run_timestamp = datetime.now(timezone.utc)
self.add_metadata_to_output()
self.safe_print_out()
def standard_parse_and_print(self) -> None:
"""supports binary and UTF-8 string data"""
self.data_in = self.magic_stdout or sys.stdin.buffer.read()
# convert to UTF-8, if possible. Otherwise, leave as bytes
try:
if isinstance(self.data_in, bytes):
self.data_in = self.data_in.decode('utf-8')
except UnicodeDecodeError:
pass
if self.parser_module:
self.data_out = self.parser_module.parse(
self.data_in,
raw=self.raw,
quiet=self.quiet
)
if self.meta_out:
self.run_timestamp = datetime.now(timezone.utc)
self.add_metadata_to_output()
self.safe_print_out()
def exit_clean(self) -> None:
exit_code: int = self.magic_returncode + JC_CLEAN_EXIT
exit_code = min(exit_code, MAX_EXIT)
sys.exit(exit_code)
def exit_error(self) -> None:
exit_code: int = self.magic_returncode + JC_ERROR_EXIT
exit_code = min(exit_code, MAX_EXIT)
sys.exit(exit_code)
def add_metadata_to_output(self) -> None:
"""
This function mutates data_out in place. If the _jc_meta field
@ -669,6 +622,113 @@ class JcCli():
utils.error_message(['Parser returned an unsupported object type.'])
self.exit_error()
@staticmethod
def lazy_splitlines(text: str) -> Iterable[str]:
start = 0
for m in re.finditer(r'(\r\n|\r|\n)', text):
begin, end = m.span()
if begin != start:
yield text[start:begin]
start = end
if text[start:]:
yield text[start:]
def slicer(self) -> None:
"""Slice input data lazily, if possible. Updates self.data_in"""
self.slice_start = -20001 # 5
self.slice_end = -20000 # -2 or 8
if not self.slice_start is None or not self.slice_end is None:
# standard parsers UTF-8 input
if isinstance(self.data_in, str):
data_in_iter = self.lazy_splitlines(self.data_in)
# positive slices
if (self.slice_start is None or self.slice_start >= 0) \
and (self.slice_end is None or self.slice_end >= 0):
self.data_in = '\n'.join(islice(data_in_iter, self.slice_start, self.slice_end))
# negative slices found (non-lazy, uses more memory)
else:
self.data_in = '\n'.join(list(data_in_iter)[self.slice_start:self.slice_end])
# standard parsers bytes input
elif isinstance(self.data_in, bytes):
utils.warning_message(['Cannot slice bytes data.'])
# streaming parsers UTF-8 input
else:
# positive slices
if (self.slice_start is None or self.slice_start >= 0) \
and (self.slice_end is None or self.slice_end >= 0) \
and self.data_in:
self.data_in = islice(self.data_in, self.slice_start, self.slice_end)
# negative slices found (non-lazy, uses more memory)
elif self.data_in:
self.data_in = list(self.data_in)[self.slice_start:self.slice_end]
def streaming_parse_and_print(self) -> None:
"""only supports UTF-8 string data for now"""
self.data_in = sys.stdin
self.slicer()
if self.parser_module:
result = self.parser_module.parse(
self.data_in,
raw=self.raw,
quiet=self.quiet,
ignore_exceptions=self.ignore_exceptions
)
for line in result:
self.data_out = line
if self.meta_out:
self.run_timestamp = datetime.now(timezone.utc)
self.add_metadata_to_output()
self.safe_print_out()
def standard_parse_and_print(self) -> None:
"""supports binary and UTF-8 string data"""
self.data_in = self.magic_stdout or sys.stdin.buffer.read()
# convert to UTF-8, if possible. Otherwise, leave as bytes
try:
if isinstance(self.data_in, bytes):
self.data_in = self.data_in.decode('utf-8')
except UnicodeDecodeError:
pass
self.slicer()
if self.parser_module:
self.data_out = self.parser_module.parse(
self.data_in,
raw=self.raw,
quiet=self.quiet
)
if self.meta_out:
self.run_timestamp = datetime.now(timezone.utc)
self.add_metadata_to_output()
self.safe_print_out()
def exit_clean(self) -> None:
exit_code: int = self.magic_returncode + JC_CLEAN_EXIT
exit_code = min(exit_code, MAX_EXIT)
sys.exit(exit_code)
def exit_error(self) -> None:
exit_code: int = self.magic_returncode + JC_ERROR_EXIT
exit_code = min(exit_code, MAX_EXIT)
sys.exit(exit_code)
def _run(self) -> None:
# enable colors for Windows cmd.exe terminal
if sys.platform.startswith('win32'):