1
0
mirror of https://github.com/httpie/cli.git synced 2025-02-09 13:14:03 +02:00

Major clean-up and refactoring.

This commit is contained in:
Jakub Roztocil 2012-04-25 01:32:53 +02:00
parent 67d6426360
commit c5b1aaaa28
5 changed files with 345 additions and 311 deletions

View File

@ -1,137 +1,41 @@
#!/usr/bin/env python #!/usr/bin/env python
import sys import sys
import json import json
try:
from collections import OrderedDict
except ImportError:
OrderedDict = dict
import requests import requests
from requests.compat import urlparse, str, is_py3 from requests.compat import str
from requests.structures import CaseInsensitiveDict from . import httpmessage
from . import cliparse
from . import cli from . import cli
from . import pretty from . import pretty
from . import __version__ as version
NEW_LINE = str('\n')
DEFAULT_UA = 'HTTPie/%s' % version
TYPE_FORM = 'application/x-www-form-urlencoded; charset=utf-8' TYPE_FORM = 'application/x-www-form-urlencoded; charset=utf-8'
TYPE_JSON = 'application/json; charset=utf-8' TYPE_JSON = 'application/json; charset=utf-8'
class HTTPMessage(object): def _get_response(parser, args, stdin, stdin_isatty):
def __init__(self, line, headers, body, content_type=None):
# {Request,Status}-Line
self.line = line
self.headers = headers
self.body = body
self.content_type = content_type
def format_http_message(message, prettifier=None,
with_headers=True, with_body=True):
bits = []
if with_headers:
if prettifier:
bits.append(prettifier.headers(message.line))
bits.append(prettifier.headers(message.headers))
else:
bits.append(message.line)
bits.append(message.headers)
if with_body and message.body:
bits.append(NEW_LINE)
if with_body and message.body:
if prettifier and message.content_type:
bits.append(prettifier.body(message.body, message.content_type))
else:
bits.append(message.body)
bits.append(NEW_LINE)
return NEW_LINE.join(bit.strip() for bit in bits)
def make_request_message(request):
"""Make an `HTTPMessage` from `requests.models.Request`."""
url = urlparse(request.url)
request_headers = dict(request.headers)
if 'Host' not in request_headers:
request_headers['Host'] = url.netloc
return HTTPMessage(
line='{method} {path} HTTP/1.1'.format(
method=request.method,
path=url.path or '/'),
headers=NEW_LINE.join(str('%s: %s') % (name, value)
for name, value
in request_headers.items()),
body=request._enc_data,
content_type=request_headers.get('Content-Type')
)
def make_response_message(response):
"""Make an `HTTPMessage` from `requests.models.Response`."""
encoding = response.encoding or 'ISO-8859-1'
original = response.raw._original_response
response_headers = response.headers
return HTTPMessage(
line='HTTP/{version} {status} {reason}'.format(
version='.'.join(str(original.version)),
status=original.status, reason=original.reason,),
headers=str(original.msg),
body=response.content.decode(encoding) if response.content else '',
content_type=response_headers.get('Content-Type'))
def main(args=None,
stdin=sys.stdin,
stdin_isatty=sys.stdin.isatty(),
stdout=sys.stdout,
stdout_isatty=sys.stdout.isatty()):
parser = cli.parser
args = parser.parse_args(args if args is not None else sys.argv[1:])
do_prettify = (args.prettify is True or
(args.prettify == cli.PRETTIFY_STDOUT_TTY_ONLY
and stdout_isatty))
# Parse request headers and data from the command line.
headers = CaseInsensitiveDict()
headers['User-Agent'] = DEFAULT_UA
data = OrderedDict()
files = OrderedDict()
try:
cli.parse_items(items=args.items, headers=headers,
data=data, files=files)
except cli.ParseError as e:
if args.traceback:
raise
parser.error(e.message)
if files and not args.form:
# We could just switch to --form automatically here,
# but I think it's better to make it explicit.
parser.error(
' You need to set the --form / -f flag to'
' to issue a multipart request. File fields: %s'
% ','.join(files.keys()))
if not stdin_isatty: if not stdin_isatty:
if data: if args.data:
parser.error('Request body (stdin) and request ' parser.error('Request body (stdin) and request '
'data (key=value) cannot be mixed.') 'data (key=value) cannot be mixed.')
data = stdin.read() args.data = stdin.read()
# JSON/Form content type. if args.json or (not args.form and args.data):
if args.json or (not args.form and data): # JSON
if args.method.lower() == 'get' and 'Accept' not in headers: if not args.files and (
headers['Accept'] = 'application/json' 'Content-Type' not in args.headers
and (args.data or args.json)):
args.headers['Content-Type'] = TYPE_JSON
if stdin_isatty: if stdin_isatty:
data = json.dumps(data) # Serialize the parsed data.
if not files and ('Content-Type' not in headers and (data or args.json)): args.data = json.dumps(args.data)
headers['Content-Type'] = TYPE_JSON if args.method.lower() == 'get' and 'Accept' not in args.headers:
elif not files and 'Content-Type' not in headers: # Default Accept to JSON as well.
headers['Content-Type'] = TYPE_FORM args.headers['Accept'] = 'application/json'
elif not args.files and 'Content-Type' not in args.headers:
# Form
args.headers['Content-Type'] = TYPE_FORM
# Fire the request. # Fire the request.
try: try:
@ -142,57 +46,75 @@ def main(args=None,
else requests.auth.HTTPBasicAuth) else requests.auth.HTTPBasicAuth)
credentials = auth_type(args.auth.key, args.auth.value) credentials = auth_type(args.auth.key, args.auth.value)
response = requests.request( return requests.request(
method=args.method.lower(), method=args.method.lower(),
url=args.url if '://' in args.url else 'http://%s' % args.url, url=args.url if '://' in args.url else 'http://%s' % args.url,
headers=headers, headers=args.headers,
data=data, data=args.data,
verify={'yes': True, 'no': False}.get(args.verify, args.verify), verify={'yes': True, 'no': False}.get(args.verify, args.verify),
timeout=args.timeout, timeout=args.timeout,
auth=credentials, auth=credentials,
proxies=dict((p.key, p.value) for p in args.proxy), proxies=dict((p.key, p.value) for p in args.proxy),
files=files, files=args.files,
allow_redirects=args.allow_redirects, allow_redirects=args.allow_redirects,
) )
except (KeyboardInterrupt, SystemExit): except (KeyboardInterrupt, SystemExit):
sys.stderr.write(NEW_LINE) sys.stderr.write('\n')
sys.exit(1) sys.exit(1)
except Exception as e: except Exception as e:
if args.traceback: if args.traceback:
raise raise
sys.stderr.write(str(e.message) + NEW_LINE) sys.stderr.write(str(e.message) + '\n')
sys.exit(1) sys.exit(1)
def _get_output(args, stdout_isatty, response):
do_prettify = (args.prettify is True or
(args.prettify == cliparse.PRETTIFY_STDOUT_TTY_ONLY
and stdout_isatty))
do_output_request = (cliparse.OUT_REQ_HEADERS in args.output_options
or cliparse.OUT_REQ_BODY in args.output_options)
do_output_response = (cliparse.OUT_RESP_HEADERS in args.output_options
or cliparse.OUT_RESP_BODY in args.output_options)
prettifier = pretty.PrettyHttp(args.style) if do_prettify else None prettifier = pretty.PrettyHttp(args.style) if do_prettify else None
do_output_request = (cli.OUT_REQ_HEADERS in args.output_options
or cli.OUT_REQ_BODY in args.output_options)
do_output_response = (cli.OUT_RESP_HEADERS in args.output_options
or cli.OUT_RESP_BODY in args.output_options)
output = [] output = []
if do_output_request: if do_output_request:
output.append(format_http_message( output.append(httpmessage.format(
message=make_request_message(response.request), message=httpmessage.from_request(response.request),
prettifier=prettifier, prettifier=prettifier,
with_headers=cli.OUT_REQ_HEADERS in args.output_options, with_headers=cliparse.OUT_REQ_HEADERS in args.output_options,
with_body=cli.OUT_REQ_BODY in args.output_options with_body=cliparse.OUT_REQ_BODY in args.output_options
)) ))
if do_output_response: if do_output_response:
output.append(NEW_LINE) output.append('\n')
if do_output_response: if do_output_response:
output.append(format_http_message( output.append(httpmessage.format(
message=make_response_message(response), message=httpmessage.from_response(response),
prettifier=prettifier, prettifier=prettifier,
with_headers=cli.OUT_RESP_HEADERS in args.output_options, with_headers=cliparse.OUT_RESP_HEADERS in args.output_options,
with_body=cli.OUT_RESP_BODY in args.output_options with_body=cliparse.OUT_RESP_BODY in args.output_options
)) ))
output.append(NEW_LINE) output.append('\n')
output_bytes = ''.join(output).encode('utf8') return ''.join(output)
f = (stdout.buffer if is_py3 and hasattr(stdout, 'buffer') else stdout)
def main(args=None,
stdin=sys.stdin, stdin_isatty=sys.stdin.isatty(),
stdout=sys.stdout, stdout_isatty=sys.stdout.isatty()):
parser = cli.parser
args = parser.parse_args(args if args is not None else sys.argv[1:])
response = _get_response(parser, args, stdin, stdin_isatty)
output = _get_output(args, stdout_isatty, response)
output_bytes = output.encode('utf8')
f = (stdout.buffer if hasattr(stdout, 'buffer') else stdout)
f.write(output_bytes) f.write(output_bytes)

View File

@ -1,118 +1,11 @@
import os """
import json CLI definition.
import argparse
import re """
from collections import namedtuple
from . import pretty from . import pretty
from . import __doc__ as doc from . import __doc__ as doc
from . import __version__ as version from . import __version__ as version
from . import cliparse
SEP_COMMON = ':'
SEP_HEADERS = SEP_COMMON
SEP_DATA = '='
SEP_DATA_RAW_JSON = ':='
SEP_FILES = '@'
OUT_REQ_HEADERS = 'H'
OUT_REQ_BODY = 'B'
OUT_RESP_HEADERS = 'h'
OUT_RESP_BODY = 'b'
OUTPUT_OPTIONS = [OUT_REQ_HEADERS,
OUT_REQ_BODY,
OUT_RESP_HEADERS,
OUT_RESP_BODY]
PRETTIFY_STDOUT_TTY_ONLY = object()
class ParseError(Exception):
pass
KeyValue = namedtuple('KeyValue', ['key', 'value', 'sep', 'orig'])
class KeyValueType(object):
"""A type used with `argparse`."""
def __init__(self, *separators):
self.separators = separators
self.escapes = ['\\\\' + sep for sep in separators]
def __call__(self, string):
found = {}
found_escapes = []
for esc in self.escapes:
found_escapes += [m.span() for m in re.finditer(esc, string)]
for sep in self.separators:
matches = re.finditer(sep, string)
for match in matches:
start, end = match.span()
inside_escape = False
for estart, eend in found_escapes:
if start >= estart and end <= eend:
inside_escape = True
break
if not inside_escape:
found[start] = sep
if not found:
raise argparse.ArgumentTypeError(
'"%s" is not a valid value' % string)
# split the string at the earliest non-escaped separator.
seploc = min(found.keys())
sep = found[seploc]
key = string[:seploc]
value = string[seploc + len(sep):]
# remove escape chars
for sepstr in self.separators:
key = key.replace('\\' + sepstr, sepstr)
value = value.replace('\\' + sepstr, sepstr)
return KeyValue(key=key, value=value, sep=sep, orig=string)
def parse_items(items, data=None, headers=None, files=None):
"""Parse `KeyValueType` `items` into `data`, `headers` and `files`."""
if headers is None:
headers = {}
if data is None:
data = {}
if files is None:
files = {}
for item in items:
value = item.value
key = item.key
if item.sep == SEP_HEADERS:
target = headers
elif item.sep == SEP_FILES:
try:
value = open(os.path.expanduser(item.value), 'r')
except IOError as e:
raise ParseError(
'Invalid argument %r. %s' % (item.orig, e))
if not key:
key = os.path.basename(value.name)
target = files
elif item.sep in [SEP_DATA, SEP_DATA_RAW_JSON]:
if item.sep == SEP_DATA_RAW_JSON:
try:
value = json.loads(item.value)
except ValueError:
raise ParseError('%s is not valid JSON' % item.orig)
target = data
else:
raise ParseError('%s is not valid item' % item.orig)
if key in target:
ParseError('duplicate item %s (%s)' % (item.key, item.orig))
target[key] = value
return headers, data, files
def _(text): def _(text):
@ -120,27 +13,10 @@ def _(text):
return ' '.join(text.strip().split()) return ' '.join(text.strip().split())
class HTTPieArgumentParser(argparse.ArgumentParser): parser = cliparse.HTTPieArgumentParser(description=doc.strip(),)
def parse_args(self, args=None, namespace=None):
args = super(HTTPieArgumentParser, self).parse_args(args, namespace)
self._validate_output_options(args)
self._validate_auth_options(args)
return args
def _validate_output_options(self, args):
unknown_output_options = set(args.output_options) - set(OUTPUT_OPTIONS)
if unknown_output_options:
self.error('Unknown output options: %s' % ','.join(unknown_output_options))
def _validate_auth_options(self, args):
if args.auth_type and not args.auth:
self.error('--auth-type can only be used with --auth')
parser = HTTPieArgumentParser(description=doc.strip(),)
parser.add_argument('--version', action='version', version=version) parser.add_argument('--version', action='version', version=version)
# Content type. # Content type.
############################################# #############################################
@ -175,7 +51,7 @@ parser.add_argument(
prettify = parser.add_mutually_exclusive_group(required=False) prettify = parser.add_mutually_exclusive_group(required=False)
prettify.add_argument( prettify.add_argument(
'--pretty', dest='prettify', action='store_true', '--pretty', dest='prettify', action='store_true',
default=PRETTIFY_STDOUT_TTY_ONLY, default=cliparse.PRETTIFY_STDOUT_TTY_ONLY,
help=_(''' help=_('''
If stdout is a terminal, the response is prettified If stdout is a terminal, the response is prettified
by default (colorized and indented if it is JSON). by default (colorized and indented if it is JSON).
@ -191,7 +67,7 @@ prettify.add_argument(
output_options = parser.add_mutually_exclusive_group(required=False) output_options = parser.add_mutually_exclusive_group(required=False)
output_options.add_argument('--print', '-p', dest='output_options', output_options.add_argument('--print', '-p', dest='output_options',
default=OUT_RESP_HEADERS + OUT_RESP_BODY, default=cliparse.OUT_RESP_HEADERS + cliparse.OUT_RESP_BODY,
help=_(''' help=_('''
String specifying what should the output contain. String specifying what should the output contain.
"{request_headers}" stands for request headers and "{request_headers}" stands for request headers and
@ -201,35 +77,35 @@ output_options.add_argument('--print', '-p', dest='output_options',
Defaults to "hb" which means that the whole response Defaults to "hb" which means that the whole response
(headers and body) is printed. (headers and body) is printed.
'''.format( '''.format(
request_headers=OUT_REQ_HEADERS, request_headers=cliparse.OUT_REQ_HEADERS,
request_body=OUT_REQ_BODY, request_body=cliparse.OUT_REQ_BODY,
response_headers=OUT_RESP_HEADERS, response_headers=cliparse.OUT_RESP_HEADERS,
response_body=OUT_RESP_BODY, response_body=cliparse.OUT_RESP_BODY,
)) ))
) )
output_options.add_argument( output_options.add_argument(
'--verbose', '-v', dest='output_options', '--verbose', '-v', dest='output_options',
action='store_const', const=''.join(OUTPUT_OPTIONS), action='store_const', const=''.join(cliparse.OUTPUT_OPTIONS),
help=_(''' help=_('''
Print the whole request as well as response. Print the whole request as well as response.
Shortcut for --print={0}. Shortcut for --print={0}.
'''.format(''.join(OUTPUT_OPTIONS))) '''.format(''.join(cliparse.OUTPUT_OPTIONS)))
) )
output_options.add_argument( output_options.add_argument(
'--headers', '-t', dest='output_options', '--headers', '-t', dest='output_options',
action='store_const', const=OUT_RESP_HEADERS, action='store_const', const=cliparse.OUT_RESP_HEADERS,
help=_(''' help=_('''
Print only the response headers. Print only the response headers.
Shortcut for --print={0}. Shortcut for --print={0}.
'''.format(OUT_RESP_HEADERS)) '''.format(cliparse.OUT_RESP_HEADERS))
) )
output_options.add_argument( output_options.add_argument(
'--body', '-b', dest='output_options', '--body', '-b', dest='output_options',
action='store_const', const=OUT_RESP_BODY, action='store_const', const=cliparse.OUT_RESP_BODY,
help=_(''' help=_('''
Print only the response body. Print only the response body.
Shortcut for --print={0}. Shortcut for --print={0}.
'''.format(OUT_RESP_BODY)) '''.format(cliparse.OUT_RESP_BODY))
) )
parser.add_argument( parser.add_argument(
@ -243,7 +119,7 @@ parser.add_argument(
# ``requests.request`` keyword arguments. # ``requests.request`` keyword arguments.
parser.add_argument( parser.add_argument(
'--auth', '-a', help='username:password', '--auth', '-a', help='username:password',
type=KeyValueType(SEP_COMMON) type=cliparse.KeyValueType(cliparse.SEP_COMMON)
) )
parser.add_argument( parser.add_argument(
@ -263,7 +139,7 @@ parser.add_argument(
) )
parser.add_argument( parser.add_argument(
'--proxy', default=[], action='append', '--proxy', default=[], action='append',
type=KeyValueType(SEP_COMMON), type=cliparse.KeyValueType(cliparse.SEP_COMMON),
help=_(''' help=_('''
String mapping protocol to the URL of the proxy String mapping protocol to the URL of the proxy
(e.g. http:foo.bar:3128). (e.g. http:foo.bar:3128).
@ -304,7 +180,12 @@ parser.add_argument(
) )
parser.add_argument( parser.add_argument(
'items', nargs='*', 'items', nargs='*',
type=KeyValueType(SEP_COMMON, SEP_DATA, SEP_DATA_RAW_JSON, SEP_FILES), type=cliparse.KeyValueType(
cliparse.SEP_COMMON,
cliparse.SEP_DATA,
cliparse.SEP_DATA_RAW_JSON,
cliparse.SEP_FILES
),
help=_(''' help=_('''
HTTP header (header:value), data field (field=value), HTTP header (header:value), data field (field=value),
raw JSON field (field:=value) raw JSON field (field:=value)

165
httpie/cliparse.py Normal file
View File

@ -0,0 +1,165 @@
"""
CLI argument parsing logic.
"""
import os
import json
import re
from collections import namedtuple
try:
from collections import OrderedDict
except ImportError:
OrderedDict = dict
import argparse
from requests.structures import CaseInsensitiveDict
from . import __version__
SEP_COMMON = ':'
SEP_HEADERS = SEP_COMMON
SEP_DATA = '='
SEP_DATA_RAW_JSON = ':='
SEP_FILES = '@'
OUT_REQ_HEADERS = 'H'
OUT_REQ_BODY = 'B'
OUT_RESP_HEADERS = 'h'
OUT_RESP_BODY = 'b'
OUTPUT_OPTIONS = [OUT_REQ_HEADERS,
OUT_REQ_BODY,
OUT_RESP_HEADERS,
OUT_RESP_BODY]
PRETTIFY_STDOUT_TTY_ONLY = object()
DEFAULT_UA = 'HTTPie/%s' % __version__
class HTTPieArgumentParser(argparse.ArgumentParser):
def parse_args(self, args=None, namespace=None):
args = super(HTTPieArgumentParser, self).parse_args(args, namespace)
self._validate_output_options(args)
self._validate_auth_options(args)
self._parse_items(args)
return args
def _parse_items(self, args):
args.headers = CaseInsensitiveDict()
args.headers['User-Agent'] = DEFAULT_UA
args.data = OrderedDict()
args.files = OrderedDict()
try:
parse_items(items=args.items, headers=args.headers,
data=args.data, files=args.files)
except ParseError as e:
if args.traceback:
raise
self.error(e.message)
if args.files and not args.form:
# We could just switch to --form automatically here,
# but I think it's better to make it explicit.
self.error(
' You need to set the --form / -f flag to'
' to issue a multipart request. File fields: %s'
% ','.join(args.files.keys()))
def _validate_output_options(self, args):
unknown_output_options = set(args.output_options) - set(OUTPUT_OPTIONS)
if unknown_output_options:
self.error('Unknown output options: %s' % ','.join(unknown_output_options))
def _validate_auth_options(self, args):
if args.auth_type and not args.auth:
self.error('--auth-type can only be used with --auth')
class ParseError(Exception):
pass
KeyValue = namedtuple('KeyValue', ['key', 'value', 'sep', 'orig'])
class KeyValueType(object):
"""A type used with `argparse`."""
def __init__(self, *separators):
self.separators = separators
self.escapes = ['\\\\' + sep for sep in separators]
def __call__(self, string):
found = {}
found_escapes = []
for esc in self.escapes:
found_escapes += [m.span() for m in re.finditer(esc, string)]
for sep in self.separators:
matches = re.finditer(sep, string)
for match in matches:
start, end = match.span()
inside_escape = False
for estart, eend in found_escapes:
if start >= estart and end <= eend:
inside_escape = True
break
if not inside_escape:
found[start] = sep
if not found:
raise argparse.ArgumentTypeError(
'"%s" is not a valid value' % string)
# split the string at the earliest non-escaped separator.
seploc = min(found.keys())
sep = found[seploc]
key = string[:seploc]
value = string[seploc + len(sep):]
# remove escape chars
for sepstr in self.separators:
key = key.replace('\\' + sepstr, sepstr)
value = value.replace('\\' + sepstr, sepstr)
return KeyValue(key=key, value=value, sep=sep, orig=string)
def parse_items(items, data=None, headers=None, files=None):
"""Parse `KeyValueType` `items` into `data`, `headers` and `files`."""
if headers is None:
headers = {}
if data is None:
data = {}
if files is None:
files = {}
for item in items:
value = item.value
key = item.key
if item.sep == SEP_HEADERS:
target = headers
elif item.sep == SEP_FILES:
try:
value = open(os.path.expanduser(item.value), 'r')
except IOError as e:
raise ParseError(
'Invalid argument %r. %s' % (item.orig, e))
if not key:
key = os.path.basename(value.name)
target = files
elif item.sep in [SEP_DATA, SEP_DATA_RAW_JSON]:
if item.sep == SEP_DATA_RAW_JSON:
try:
value = json.loads(item.value)
except ValueError:
raise ParseError('%s is not valid JSON' % item.orig)
target = data
else:
raise ParseError('%s is not valid item' % item.orig)
if key in target:
ParseError('duplicate item %s (%s)' % (item.key, item.orig))
target[key] = value
return headers, data, files

66
httpie/httpmessage.py Normal file
View File

@ -0,0 +1,66 @@
from requests.compat import urlparse
class HTTPMessage(object):
"""Model representing an HTTP message."""
def __init__(self, line, headers, body, content_type=None):
# {Request,Status}-Line
self.line = line
self.headers = headers
self.body = body
self.content_type = content_type
def from_request(request):
"""Make an `HTTPMessage` from `requests.models.Request`."""
url = urlparse(request.url)
request_headers = dict(request.headers)
if 'Host' not in request_headers:
request_headers['Host'] = url.netloc
return HTTPMessage(
line='{method} {path} HTTP/1.1'.format(
method=request.method,
path=url.path or '/'),
headers='\n'.join(str('%s: %s') % (name, value)
for name, value
in request_headers.items()),
body=request._enc_data,
content_type=request_headers.get('Content-Type')
)
def from_response(response):
"""Make an `HTTPMessage` from `requests.models.Response`."""
encoding = response.encoding or 'ISO-8859-1'
original = response.raw._original_response
response_headers = response.headers
return HTTPMessage(
line='HTTP/{version} {status} {reason}'.format(
version='.'.join(str(original.version)),
status=original.status, reason=original.reason,),
headers=str(original.msg),
body=response.content.decode(encoding) if response.content else '',
content_type=response_headers.get('Content-Type'))
def format(message, prettifier=None,
with_headers=True, with_body=True):
"""Return a `unicode` representation of `message`. """
bits = []
if with_headers:
if prettifier:
bits.append(prettifier.headers(message.line))
bits.append(prettifier.headers(message.headers))
else:
bits.append(message.line)
bits.append(message.headers)
if with_body and message.body:
bits.append('\n')
if with_body and message.body:
if prettifier and message.content_type:
bits.append(prettifier.body(message.body, message.content_type))
else:
bits.append(message.body)
bits.append('\n')
return '\n'.join(bit.strip() for bit in bits)

View File

@ -10,7 +10,7 @@ import tempfile
TESTS_ROOT = os.path.dirname(__file__) TESTS_ROOT = os.path.dirname(__file__)
sys.path.insert(0, os.path.realpath(os.path.join(TESTS_ROOT, '..'))) sys.path.insert(0, os.path.realpath(os.path.join(TESTS_ROOT, '..')))
from httpie import __main__ from httpie import __main__
from httpie import cli from httpie import cliparse
TEST_FILE = os.path.join(TESTS_ROOT, 'file.txt') TEST_FILE = os.path.join(TESTS_ROOT, 'file.txt')
@ -48,28 +48,28 @@ class BaseTest(unittest.TestCase):
class TestItemParsing(BaseTest): class TestItemParsing(BaseTest):
def setUp(self): def setUp(self):
self.kv = cli.KeyValueType( self.key_value_type = cliparse.KeyValueType(
cli.SEP_HEADERS, cliparse.SEP_HEADERS,
cli.SEP_DATA, cliparse.SEP_DATA,
cli.SEP_DATA_RAW_JSON, cliparse.SEP_DATA_RAW_JSON,
cli.SEP_FILES, cliparse.SEP_FILES,
) )
def test_invalid_items(self): def test_invalid_items(self):
items = ['no-separator'] items = ['no-separator']
for item in items: for item in items:
self.assertRaises(argparse.ArgumentTypeError, self.assertRaises(argparse.ArgumentTypeError,
lambda: self.kv(item)) lambda: self.key_value_type(item))
def test_escape(self): def test_escape(self):
headers, data, files = cli.parse_items([ headers, data, files = cliparse.parse_items([
# headers # headers
self.kv('foo\\:bar:baz'), self.key_value_type('foo\\:bar:baz'),
self.kv('jack\\@jill:hill'), self.key_value_type('jack\\@jill:hill'),
# data # data
self.kv('baz\\=bar=foo'), self.key_value_type('baz\\=bar=foo'),
# files # files
self.kv('bar\\@baz@%s' % TEST_FILE) self.key_value_type('bar\\@baz@%s' % TEST_FILE)
]) ])
self.assertDictEqual(headers, { self.assertDictEqual(headers, {
'foo:bar': 'baz', 'foo:bar': 'baz',
@ -81,23 +81,23 @@ class TestItemParsing(BaseTest):
self.assertIn('bar@baz', files) self.assertIn('bar@baz', files)
def test_escape_longsep(self): def test_escape_longsep(self):
headers, data, files = cli.parse_items([ headers, data, files = cliparse.parse_items([
self.kv('bob\\:==foo'), self.key_value_type('bob\\:==foo'),
]) ])
self.assertDictEqual(data, { self.assertDictEqual(data, {
'bob:=': 'foo', 'bob:=': 'foo',
}) })
def test_valid_items(self): def test_valid_items(self):
headers, data, files = cli.parse_items([ headers, data, files = cliparse.parse_items([
self.kv('string=value'), self.key_value_type('string=value'),
self.kv('header:value'), self.key_value_type('header:value'),
self.kv('list:=["a", 1, {}, false]'), self.key_value_type('list:=["a", 1, {}, false]'),
self.kv('obj:={"a": "b"}'), self.key_value_type('obj:={"a": "b"}'),
self.kv('eh:'), self.key_value_type('eh:'),
self.kv('ed='), self.key_value_type('ed='),
self.kv('bool:=true'), self.key_value_type('bool:=true'),
self.kv('test-file@%s' % TEST_FILE), self.key_value_type('test-file@%s' % TEST_FILE),
]) ])
self.assertDictEqual(headers, { self.assertDictEqual(headers, {
'header': 'value', 'header': 'value',
@ -163,7 +163,7 @@ class TestPrettyFlag(BaseTest):
class TestFileUpload(BaseTest): class TestFileUpload(BaseTest):
def test_non_existent_file_raises_parse_error(self): def test_non_existent_file_raises_parse_error(self):
self.assertRaises(cli.ParseError, http, self.assertRaises(cliparse.ParseError, http,
'--form', '--traceback', '--form', '--traceback',
'POST', 'http://httpbin.org/post', 'POST', 'http://httpbin.org/post',
'foo@/__does_not_exist__') 'foo@/__does_not_exist__')