1
0
mirror of https://github.com/httpie/cli.git synced 2025-08-10 22:42:05 +02:00

Implement HTTPie Nested JSON v2

This commit is contained in:
Batuhan Taskaya
2022-01-04 12:04:20 +03:00
parent 21faddc4b9
commit 7bf373751d
7 changed files with 665 additions and 228 deletions

View File

@@ -57,7 +57,7 @@ class KeyValueArgType:
def __init__(self, *separators: str):
self.separators = separators
self.special_characters = set('\\')
self.special_characters = set()
for separator in separators:
self.special_characters.update(separator)
@@ -113,7 +113,7 @@ class KeyValueArgType:
There are only two token types - strings and escaped characters:
>>> KeyValueArgType('=').tokenize(r'foo\=bar\\baz')
['foo', Escaped('='), 'bar', Escaped('\\'), 'baz']
['foo', Escaped('='), 'bar\\\\baz']
"""
tokens = ['']

View File

@@ -52,6 +52,8 @@ SEPARATOR_GROUP_DATA_EMBED_ITEMS = frozenset({
SEPARATOR_GROUP_NESTED_JSON_ITEMS = frozenset([
SEPARATOR_DATA_STRING,
SEPARATOR_DATA_RAW_JSON,
SEPARATOR_DATA_EMBED_FILE_CONTENTS,
SEPARATOR_DATA_EMBED_RAW_JSON_FILE,
])
# Separators allowed in ITEM arguments

View File

@@ -1,150 +0,0 @@
"""
Routines for JSON form syntax, used to support nested JSON request items.
Highly inspired from the great jarg project <https://github.com/jdp/jarg/blob/master/jarg/jsonform.py>.
"""
import re
import operator
from typing import Optional
def step(value: str, is_escaped: bool) -> str:
if is_escaped:
value = value.replace(r'\[', '[').replace(r'\]', ']')
return value
def find_opening_bracket(
value: str,
search=re.compile(r'(?<!\\)\[').search
) -> Optional[int]:
match = search(value)
if not match:
return None
return match.start()
def find_closing_bracket(
value: str,
search=re.compile(r'(?<!\\)\]').search
) -> Optional[int]:
match = search(value)
if not match:
return None
return match.start()
def parse_path(path):
"""
Parse a string as a JSON path.
An implementation of 'steps to parse a JSON encoding path'.
<https://www.w3.org/TR/html-json-forms/#dfn-steps-to-parse-a-json-encoding-path>
"""
original = path
is_escaped = r'\[' in original
opening_bracket = find_opening_bracket(original)
last_step = [(step(path, is_escaped), {'last': True, 'type': 'object'})]
if opening_bracket is None:
return last_step
steps = [(step(original[:opening_bracket], is_escaped), {'type': 'object'})]
path = original[opening_bracket:]
while path:
if path.startswith('[]'):
steps[-1][1]['append'] = True
path = path[2:]
if path:
return last_step
elif path[0] == '[':
path = path[1:]
closing_bracket = find_closing_bracket(path)
if closing_bracket is None:
return last_step
key = path[:closing_bracket]
path = path[closing_bracket + 1:]
try:
steps.append((int(key), {'type': 'array'}))
except ValueError:
steps.append((key, {'type': 'object'}))
elif path[:2] == r'\[':
key = step(path[1:path.index(r'\]') + 2], is_escaped)
path = path[path.index(r'\]') + 2:]
steps.append((key, {'type': 'object'}))
else:
return last_step
for i in range(len(steps) - 1):
steps[i][1]['type'] = steps[i + 1][1]['type']
steps[-1][1]['last'] = True
return steps
def set_value(context, step, current_value, entry_value):
"""Apply a JSON value to a context object.
An implementation of 'steps to set a JSON encoding value'.
<https://www.w3.org/TR/html-json-forms/#dfn-steps-to-set-a-json-encoding-value>
"""
key, flags = step
if flags.get('last', False):
if current_value is None:
if flags.get('append', False):
context[key] = [entry_value]
else:
if isinstance(context, list) and len(context) <= key:
context.extend([None] * (key - len(context) + 1))
context[key] = entry_value
elif isinstance(current_value, list):
context[key].append(entry_value)
else:
context[key] = [current_value, entry_value]
return context
if current_value is None:
if flags.get('type') == 'array':
context[key] = []
else:
if isinstance(context, list) and len(context) <= key:
context.extend([None] * (key - len(context) + 1))
context[key] = {}
return context[key]
elif isinstance(current_value, dict):
return context[key]
elif isinstance(current_value, list):
if flags.get('type') == 'array':
return current_value
obj = {}
for i, item in enumerate(current_value):
if item is not None:
obj[i] = item
else:
context[key] = obj
return obj
else:
obj = {'': current_value}
context[key] = obj
return obj
def interpret_json_form(pairs):
"""The application/json form encoding algorithm.
<https://www.w3.org/TR/html-json-forms/#dfn-application-json-encoding-algorithm>
"""
result = {}
for key, value in pairs:
steps = parse_path(key)
context = result
for step in steps:
try:
current_value = operator.getitem(context, step[0])
except LookupError:
current_value = None
context = set_value(context, step, current_value, value)
return result

311
httpie/cli/nested_json.py Normal file
View File

@@ -0,0 +1,311 @@
from enum import Enum, auto
from typing import (
Any,
Iterator,
NamedTuple,
Optional,
List,
NoReturn,
Type,
Union,
)
class HTTPieSyntaxError(ValueError):
def __init__(
self,
source: str,
token: Optional['Token'],
message: str,
message_kind: str = 'Syntax',
) -> None:
self.source = source
self.token = token
self.message = message
self.message_kind = message_kind
def __str__(self):
lines = [f'HTTPie {self.message_kind} Error: {self.message}']
if self.token is not None:
lines.append(self.source)
lines.append(
' ' * (self.token.start)
+ '^' * (self.token.end - self.token.start)
)
return '\n'.join(lines)
class TokenKind(Enum):
TEXT = auto()
NUMBER = auto()
LEFT_BRACKET = auto()
RIGHT_BRACKET = auto()
def to_name(self) -> str:
for key, value in OPERATORS.items():
if value is self:
return repr(key)
else:
return 'a ' + self.name.lower()
OPERATORS = {'[': TokenKind.LEFT_BRACKET, ']': TokenKind.RIGHT_BRACKET}
SPECIAL_CHARS = OPERATORS.keys() | {'\\'}
class Token(NamedTuple):
kind: TokenKind
value: Union[str, int]
start: int
end: int
def assert_cant_happen() -> NoReturn:
raise ValueError("Unexpected value")
def tokenize(source: str) -> Iterator[Token]:
cursor = 0
backslashes = 0
buffer = []
def send_buffer() -> Iterator[Token]:
nonlocal backslashes
if not buffer:
return None
value = ''.join(buffer)
try:
value = int(value)
except ValueError:
kind = TokenKind.TEXT
else:
kind = TokenKind.NUMBER
yield Token(
kind, value, start=cursor - (len(buffer) + backslashes), end=cursor
)
buffer.clear()
backslashes = 0
def can_advance() -> bool:
return cursor < len(source)
while can_advance():
index = source[cursor]
if index in OPERATORS:
yield from send_buffer()
yield Token(OPERATORS[index], index, cursor, cursor + 1)
elif index == '\\' and can_advance():
if source[cursor + 1] in SPECIAL_CHARS:
backslashes += 1
else:
buffer.append(index)
buffer.append(source[cursor + 1])
cursor += 1
else:
buffer.append(index)
cursor += 1
yield from send_buffer()
class Path:
def __init__(
self,
kind: str,
accessor: Optional[Union[str, int]] = None,
tokens: Optional[List[Token]] = None,
is_root: bool = False,
):
self.kind = kind
self.accessor = accessor
self.tokens = tokens or []
self.is_root = is_root
def reconstruct(self) -> str:
if self.kind == 'key':
if self.is_root:
return self.accessor
return '[' + self.accessor + ']'
elif self.kind == 'index':
return '[' + str(self.accessor) + ']'
elif self.kind == 'append':
return '[]'
else:
assert_cant_happen()
def parse(source: str) -> Iterator[Path]:
"""
start: literal? path*
literal: TEXT | NUMBER
path:
key_path
| index_path
| append_path
key_path: LEFT_BRACKET TEXT RIGHT_BRACKET
index_path: LEFT_BRACKET NUMBER RIGHT_BRACKET
append_path: LEFT_BRACKET RIGHT_BRACKET
"""
tokens = list(tokenize(source))
cursor = 0
def can_advance():
return cursor < len(tokens)
def expect(*kinds):
nonlocal cursor
assert len(kinds) > 0
if can_advance():
token = tokens[cursor]
cursor += 1
if token.kind in kinds:
return token
else:
token = tokens[-1]._replace(
start=tokens[-1].end + 0, end=tokens[-1].end + 1
)
if len(kinds) == 1:
suffix = kinds[0].to_name()
else:
suffix = ', '.join(kind.to_name() for kind in kinds[:-1])
suffix += ' or ' + kinds[-1].to_name()
message = f'Expecting {suffix}'
raise HTTPieSyntaxError(source, token, message)
root = Path('key', '', is_root=True)
if can_advance():
token = tokens[cursor]
if token.kind in {TokenKind.TEXT, TokenKind.NUMBER}:
token = expect(TokenKind.TEXT, TokenKind.NUMBER)
root.accessor = str(token.value)
root.tokens.append(token)
yield root
while can_advance():
path_tokens = []
path_tokens.append(expect(TokenKind.LEFT_BRACKET))
token = expect(
TokenKind.TEXT, TokenKind.NUMBER, TokenKind.RIGHT_BRACKET
)
path_tokens.append(token)
if token.kind is TokenKind.RIGHT_BRACKET:
path = Path('append', tokens=path_tokens)
elif token.kind is TokenKind.TEXT:
path = Path('key', token.value, tokens=path_tokens)
path_tokens.append(expect(TokenKind.RIGHT_BRACKET))
elif token.kind is TokenKind.NUMBER:
path = Path('index', token.value, tokens=path_tokens)
path_tokens.append(expect(TokenKind.RIGHT_BRACKET))
else:
assert_cant_happen()
yield path
JSON_TYPE_MAPPING = {
dict: 'object',
list: 'array',
int: 'number',
float: 'number',
str: 'string',
}
def interpret(context: Any, key: str, value: Any) -> Any:
cursor = context
paths = list(parse(key))
paths.append(Path('set', value))
def type_check(index: int, path: Path, expected_type: Type[Any]) -> None:
if not isinstance(cursor, expected_type):
if path.tokens:
pseudo_token = Token(
None, None, path.tokens[0].start, path.tokens[-1].end
)
else:
pseudo_token = None
cursor_type = JSON_TYPE_MAPPING.get(
type(cursor), type(cursor).__name__
)
required_type = JSON_TYPE_MAPPING[expected_type]
message = f"Can't perform {path.kind!r} based access on "
message += repr(
''.join(path.reconstruct() for path in paths[:index])
)
message += (
f' which has a type of {cursor_type!r} but this operation'
)
message += f' requires a type of {required_type!r}.'
raise HTTPieSyntaxError(
key, pseudo_token, message, message_kind='Type'
)
def object_for(kind: str) -> str:
if kind == 'key':
return {}
elif kind in {'index', 'append'}:
return []
else:
assert_cant_happen()
for index, (path, next_path) in enumerate(zip(paths, paths[1:])):
if path.kind == 'key':
type_check(index, path, dict)
if next_path.kind == 'set':
cursor[path.accessor] = next_path.accessor
break
cursor = cursor.setdefault(
path.accessor, object_for(next_path.kind)
)
elif path.kind == 'index':
type_check(index, path, list)
if path.accessor < 0:
raise HTTPieSyntaxError(
key,
path.tokens[1],
'Negative indexes are not supported.',
message_kind='Value',
)
cursor.extend([None] * (path.accessor - len(cursor) + 1))
if next_path.kind == 'set':
cursor[path.accessor] = next_path.accessor
break
if cursor[path.accessor] is None:
cursor[path.accessor] = object_for(next_path.kind)
cursor = cursor[path.accessor]
elif path.kind == 'append':
type_check(index, path, list)
if next_path.kind == 'set':
cursor.append(next_path.accessor)
break
cursor.append(object_for(next_path.kind))
cursor = cursor[-1]
else:
assert_cant_happen()
return context
def interpret_nested_json(pairs):
context = {}
for key, value in pairs:
interpret(context, key, value)
return context

View File

@@ -17,7 +17,7 @@ from .dicts import (
RequestQueryParamsDict,
)
from .exceptions import ParseError
from .json_form import interpret_json_form
from .nested_json import interpret_nested_json
from ..utils import get_content_type, load_json_preserve_order_and_dupe_keys, split
@@ -202,7 +202,7 @@ def process_data_raw_json_embed_arg(arg: KeyValueArg) -> JSONType:
def process_data_nested_json_embed_args(pairs) -> Dict[str, JSONType]:
return interpret_json_form(pairs)
return interpret_nested_json(pairs)
def load_text_file(item: KeyValueArg) -> str: