mirror of
https://github.com/kellyjonbrazil/jc.git
synced 2025-07-13 01:20:24 +02:00
relax JSONDictType - unions and Dicts cause a lot of friction
This commit is contained in:
@ -51,7 +51,7 @@ Add `_jc_meta` object to output line if `ignore_exceptions=True`
|
||||
### stream\_error
|
||||
|
||||
```python
|
||||
def stream_error(e: BaseException, line: str) -> Dict[str, MetadataType]
|
||||
def stream_error(e: BaseException, line: str) -> JSONDictType
|
||||
```
|
||||
|
||||
Return an error `_jc_meta` field.
|
||||
|
14
jc/cli.py
14
jc/cli.py
@ -15,7 +15,7 @@ from .lib import (
|
||||
__version__, parser_info, all_parser_info, parsers, _get_parser, _parser_is_streaming,
|
||||
parser_mod_list, standard_parser_mod_list, plugin_parser_mod_list, streaming_parser_mod_list
|
||||
)
|
||||
from .jc_types import JSONDictType, AboutJCType, MetadataType, CustomColorType, ParserInfoType
|
||||
from .jc_types import JSONDictType, CustomColorType, ParserInfoType
|
||||
from . import utils
|
||||
from .cli_data import (
|
||||
long_options_map, new_pygments_colors, old_pygments_colors, helptext_preamble_string,
|
||||
@ -74,7 +74,7 @@ class JcCli():
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.data_in: Optional[Union[str, bytes, TextIO]] = None
|
||||
self.data_out: Optional[Union[List[JSONDictType], JSONDictType, AboutJCType]] = None
|
||||
self.data_out: Optional[Union[List[JSONDictType], JSONDictType]] = None
|
||||
self.options: List[str] = []
|
||||
self.args: List[str] = []
|
||||
self.parser_module: Optional[ModuleType] = None
|
||||
@ -214,7 +214,7 @@ class JcCli():
|
||||
return otext
|
||||
|
||||
@staticmethod
|
||||
def about_jc() -> AboutJCType:
|
||||
def about_jc() -> JSONDictType:
|
||||
"""Return jc info and the contents of each parser.info as a dictionary"""
|
||||
return {
|
||||
'name': 'jc',
|
||||
@ -598,7 +598,7 @@ class JcCli():
|
||||
even if there are no results.
|
||||
"""
|
||||
if self.run_timestamp:
|
||||
meta_obj: MetadataType = {
|
||||
meta_obj: JSONDictType = {
|
||||
'parser': self.parser_name,
|
||||
'timestamp': self.run_timestamp.timestamp()
|
||||
}
|
||||
@ -609,9 +609,9 @@ class JcCli():
|
||||
|
||||
if isinstance(self.data_out, dict):
|
||||
if '_jc_meta' not in self.data_out:
|
||||
self.data_out['_jc_meta'] = {} # type: ignore
|
||||
self.data_out['_jc_meta'] = {}
|
||||
|
||||
self.data_out['_jc_meta'].update(meta_obj) # type: ignore
|
||||
self.data_out['_jc_meta'].update(meta_obj)
|
||||
|
||||
elif isinstance(self.data_out, list):
|
||||
if not self.data_out:
|
||||
@ -622,7 +622,7 @@ class JcCli():
|
||||
if '_jc_meta' not in item:
|
||||
item['_jc_meta'] = {}
|
||||
|
||||
item['_jc_meta'].update(meta_obj) # type: ignore
|
||||
item['_jc_meta'].update(meta_obj)
|
||||
|
||||
else:
|
||||
utils.error_message(['Parser returned an unsupported object type.'])
|
||||
|
@ -1,11 +1,9 @@
|
||||
"""jc - JSON Convert lib module"""
|
||||
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Tuple, Iterator, Optional, Union
|
||||
from typing import Any, Dict, List, Tuple, Iterator, Optional, Union
|
||||
|
||||
JSONDictType = Dict[str, Union[str, int, float, bool, List, Dict, None]]
|
||||
MetadataType = Dict[str, Optional[Union[str, int, float, List[str], datetime]]]
|
||||
JSONDictType = Dict[str, Any]
|
||||
StreamingOutputType = Iterator[Union[JSONDictType, Tuple[BaseException, str]]]
|
||||
|
||||
if sys.version_info >= (3, 8):
|
||||
@ -45,9 +43,6 @@ else:
|
||||
TimeStampFormatType = Dict
|
||||
|
||||
|
||||
AboutJCType = Dict[str, Union[str, int, List[ParserInfoType]]]
|
||||
|
||||
|
||||
try:
|
||||
from pygments.token import (Name, Number, String, Keyword)
|
||||
CustomColorType = Dict[Union[Name.Tag, Number, String, Keyword], str]
|
||||
|
@ -212,7 +212,7 @@ Examples:
|
||||
"""
|
||||
import re
|
||||
from ipaddress import IPv4Network
|
||||
from typing import List, Dict
|
||||
from typing import List, Dict, Optional
|
||||
from jc.jc_types import JSONDictType
|
||||
import jc.utils
|
||||
|
||||
@ -230,7 +230,7 @@ class info():
|
||||
__version__ = info.version
|
||||
|
||||
|
||||
def _convert_cidr_to_quad(string):
|
||||
def _convert_cidr_to_quad(string: str) -> str:
|
||||
return str(IPv4Network('0.0.0.0/' + string).netmask)
|
||||
|
||||
|
||||
@ -261,72 +261,72 @@ def _process(proc_data: List[JSONDictType]) -> List[JSONDictType]:
|
||||
# convert OSX-style subnet mask to dotted quad
|
||||
if 'ipv4_mask' in entry:
|
||||
try:
|
||||
if entry['ipv4_mask'].startswith('0x'): # type: ignore
|
||||
if entry['ipv4_mask'].startswith('0x'):
|
||||
new_mask = entry['ipv4_mask']
|
||||
new_mask = new_mask.lstrip('0x') # type: ignore
|
||||
new_mask = new_mask.lstrip('0x')
|
||||
new_mask = '.'.join(str(int(i, 16)) for i in [new_mask[i:i + 2] for i in range(0, len(new_mask), 2)])
|
||||
entry['ipv4_mask'] = new_mask
|
||||
except (ValueError, TypeError, AttributeError):
|
||||
pass
|
||||
|
||||
# for new-style freebsd output convert CIDR mask to dotted-quad to match other output
|
||||
if entry['ipv4_mask'] and not '.' in entry['ipv4_mask']: # type: ignore
|
||||
if entry['ipv4_mask'] and not '.' in entry['ipv4_mask']:
|
||||
entry['ipv4_mask'] = _convert_cidr_to_quad(entry['ipv4_mask'])
|
||||
|
||||
# convert state value to an array
|
||||
if 'state' in entry:
|
||||
try:
|
||||
new_state = entry['state'].split(',') # type: ignore
|
||||
new_state = entry['state'].split(',')
|
||||
entry['state'] = new_state
|
||||
except (ValueError, TypeError, AttributeError):
|
||||
pass
|
||||
|
||||
# conversions for list of ipv4 addresses
|
||||
if 'ipv4' in entry:
|
||||
for ip_address in entry['ipv4']: # type: ignore
|
||||
for ip_address in entry['ipv4']:
|
||||
if 'mask' in ip_address:
|
||||
try:
|
||||
if ip_address['mask'].startswith('0x'): # type: ignore
|
||||
new_mask = ip_address['mask'] # type: ignore
|
||||
if ip_address['mask'].startswith('0x'):
|
||||
new_mask = ip_address['mask']
|
||||
new_mask = new_mask.lstrip('0x')
|
||||
new_mask = '.'.join(str(int(i, 16)) for i in [new_mask[i:i + 2] for i in range(0, len(new_mask), 2)])
|
||||
ip_address['mask'] = new_mask # type: ignore
|
||||
ip_address['mask'] = new_mask
|
||||
except (ValueError, TypeError, AttributeError):
|
||||
pass
|
||||
|
||||
# for new-style freebsd output convert CIDR mask to dotted-quad to match other output
|
||||
if ip_address['mask'] and not '.' in ip_address['mask']: # type: ignore
|
||||
ip_address['mask'] = _convert_cidr_to_quad(ip_address['mask']) # type: ignore
|
||||
if ip_address['mask'] and not '.' in ip_address['mask']:
|
||||
ip_address['mask'] = _convert_cidr_to_quad(ip_address['mask'])
|
||||
|
||||
# conversions for list of ipv6 addresses
|
||||
if 'ipv6' in entry:
|
||||
for ip_address in entry['ipv6']: # type: ignore
|
||||
for ip_address in entry['ipv6']:
|
||||
if 'mask' in ip_address:
|
||||
ip_address['mask'] = jc.utils.convert_to_int(ip_address['mask']) # type: ignore
|
||||
ip_address['mask'] = jc.utils.convert_to_int(ip_address['mask'])
|
||||
|
||||
# conversions for list of lanes
|
||||
if 'lanes' in entry:
|
||||
for lane_item in entry['lanes']: # type: ignore
|
||||
for lane_item in entry['lanes']:
|
||||
for key in lane_item:
|
||||
if key in int_list:
|
||||
lane_item[key] = jc.utils.convert_to_int(lane_item[key]) # type: ignore
|
||||
lane_item[key] = jc.utils.convert_to_int(lane_item[key])
|
||||
if key in float_list:
|
||||
lane_item[key] = jc.utils.convert_to_float(lane_item[key]) # type: ignore
|
||||
lane_item[key] = jc.utils.convert_to_float(lane_item[key])
|
||||
|
||||
# final conversions
|
||||
if entry.get('media_flags', None):
|
||||
entry['media_flags'] = entry['media_flags'].split(',') # type: ignore
|
||||
entry['media_flags'] = entry['media_flags'].split(',')
|
||||
|
||||
if entry.get('nd6_flags', None):
|
||||
entry['nd6_flags'] = entry['nd6_flags'].split(',') # type: ignore
|
||||
entry['nd6_flags'] = entry['nd6_flags'].split(',')
|
||||
|
||||
if entry.get('options_flags', None):
|
||||
entry['options_flags'] = entry['options_flags'].split(',') # type: ignore
|
||||
entry['options_flags'] = entry['options_flags'].split(',')
|
||||
|
||||
return proc_data
|
||||
|
||||
|
||||
def _bundle_match(pattern_list, string):
|
||||
def _bundle_match(pattern_list: List[re.Pattern], string: str) -> Optional[re.Match]:
|
||||
"""Returns a match object if a string matches one of a list of patterns.
|
||||
If no match is found, returns None"""
|
||||
for pattern in pattern_list:
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
from functools import wraps
|
||||
from typing import Dict, Tuple, Union, Iterable, Callable, TypeVar, cast, Any
|
||||
from .jc_types import JSONDictType, MetadataType
|
||||
from .jc_types import JSONDictType
|
||||
|
||||
|
||||
F = TypeVar('F', bound=Callable[..., Any])
|
||||
@ -31,7 +31,7 @@ def stream_success(output_line: JSONDictType, ignore_exceptions: bool) -> JSONDi
|
||||
return output_line
|
||||
|
||||
|
||||
def stream_error(e: BaseException, line: str) -> Dict[str, MetadataType]:
|
||||
def stream_error(e: BaseException, line: str) -> JSONDictType:
|
||||
"""
|
||||
Return an error `_jc_meta` field.
|
||||
"""
|
||||
|
Reference in New Issue
Block a user