1
0
mirror of https://github.com/httpie/cli.git synced 2026-04-24 19:53:55 +02:00
Files
httpie-cli/httpie/models.py
T

209 lines
5.4 KiB
Python
Raw Normal View History

import requests
from enum import Enum, auto
from typing import Iterable, Union, NamedTuple
2019-08-29 08:53:56 +02:00
from urllib.parse import urlsplit
2012-09-17 00:37:36 +02:00
from .cli.constants import (
OUT_REQ_BODY,
OUT_REQ_HEAD,
OUT_RESP_BODY,
OUT_RESP_HEAD,
OUT_RESP_META
)
from .compat import cached_property
from .utils import split_cookies, parse_content_type_header
2012-07-21 02:59:43 +02:00
class HTTPMessage:
2012-08-03 01:01:15 +02:00
"""Abstract class for HTTP messages."""
2012-07-21 02:59:43 +02:00
def __init__(self, orig):
self._orig = orig
def iter_body(self, chunk_size: int) -> Iterable[bytes]:
2012-08-03 01:01:15 +02:00
"""Return an iterator over the body."""
raise NotImplementedError
2012-08-03 01:01:15 +02:00
def iter_lines(self, chunk_size: int) -> Iterable[bytes]:
2012-08-03 01:01:15 +02:00
"""Return an iterator over the body yielding (`line`, `line_feed`)."""
raise NotImplementedError
2012-08-03 01:01:15 +02:00
@property
def headers(self) -> str:
2012-08-03 01:01:15 +02:00
"""Return a `str` with the message's headers."""
raise NotImplementedError
2012-08-03 01:01:15 +02:00
@property
def metadata(self) -> str:
"""Return metadata about the current message."""
raise NotImplementedError
@cached_property
def encoding(self) -> str:
ct, params = parse_content_type_header(self.content_type)
return params.get('charset', '')
2012-08-03 01:01:15 +02:00
@property
def content_type(self) -> str:
2012-08-03 01:01:15 +02:00
"""Return the message content type."""
2014-04-26 17:16:11 +02:00
ct = self._orig.headers.get('Content-Type', '')
if not isinstance(ct, str):
2021-08-05 20:58:43 +02:00
ct = ct.decode()
2014-04-26 17:16:11 +02:00
return ct
class HTTPResponse(HTTPMessage):
"""A :class:`requests.models.Response` wrapper."""
2012-08-03 01:01:15 +02:00
def iter_body(self, chunk_size=1):
return self._orig.iter_content(chunk_size=chunk_size)
def iter_lines(self, chunk_size):
2012-08-10 01:07:01 +02:00
return ((line, b'\n') for line in self._orig.iter_lines(chunk_size))
2012-08-01 23:21:52 +02:00
2016-03-02 02:53:23 +08:00
# noinspection PyProtectedMember
@property
2012-08-03 01:01:15 +02:00
def headers(self):
try:
raw_version = self._orig.raw._original_response.version
except AttributeError:
# Assume HTTP/1.1
raw_version = 11
2015-02-07 16:29:27 +01:00
version = {
9: '0.9',
10: '1.0',
11: '1.1',
20: '2',
}[raw_version]
2015-02-07 16:29:27 +01:00
original = self._orig
status_line = f'HTTP/{version} {original.status_code} {original.reason}'
2012-08-10 01:07:01 +02:00
headers = [status_line]
2021-05-27 13:05:41 +02:00
headers.extend(
': '.join(header)
for header in original.headers.items()
if header[0] != 'Set-Cookie'
)
headers.extend(
f'Set-Cookie: {cookie}'
for header, value in original.headers.items()
for cookie in split_cookies(value)
if header == 'Set-Cookie'
)
2012-08-10 01:07:01 +02:00
return '\r\n'.join(headers)
@property
def metadata(self) -> str:
data = {}
data['Elapsed time'] = str(self._orig.elapsed.total_seconds()) + 's'
return '\n'.join(
f'{key}: {value}'
for key, value in data.items()
)
class HTTPRequest(HTTPMessage):
"""A :class:`requests.models.Request` wrapper."""
2012-08-03 01:01:15 +02:00
def iter_body(self, chunk_size):
2012-08-01 23:21:52 +02:00
yield self.body
2012-08-03 01:01:15 +02:00
def iter_lines(self, chunk_size):
yield self.body, b''
@property
2012-08-03 01:01:15 +02:00
def headers(self):
2013-01-03 14:12:27 +01:00
url = urlsplit(self._orig.url)
2012-07-21 02:59:43 +02:00
2012-08-03 01:01:15 +02:00
request_line = '{method} {path}{query} HTTP/1.1'.format(
method=self._orig.method,
2012-07-21 02:59:43 +02:00
path=url.path or '/',
query=f'?{url.query}' if url.query else ''
2012-08-03 01:01:15 +02:00
)
headers = self._orig.headers.copy()
2014-06-28 13:24:14 +02:00
if 'Host' not in self._orig.headers:
2014-01-06 18:39:11 +00:00
headers['Host'] = url.netloc.split('@')[-1]
headers = [
2021-08-05 20:58:43 +02:00
f'{name}: {value if isinstance(value, str) else value.decode()}'
for name, value in headers.items()
]
2012-08-03 01:01:15 +02:00
headers.insert(0, request_line)
headers = '\r\n'.join(headers).strip()
return headers
@property
def body(self):
2013-01-03 14:54:34 +01:00
body = self._orig.body
if isinstance(body, str):
# Happens with JSON/form request data parsed from the command line.
2021-08-05 20:58:43 +02:00
body = body.encode()
2013-01-03 14:54:34 +01:00
return body or b''
RequestsMessage = Union[requests.PreparedRequest, requests.Response]
class RequestsMessageKind(Enum):
REQUEST = auto()
RESPONSE = auto()
def infer_requests_message_kind(message: RequestsMessage) -> RequestsMessageKind:
if isinstance(message, requests.PreparedRequest):
return RequestsMessageKind.REQUEST
elif isinstance(message, requests.Response):
return RequestsMessageKind.RESPONSE
else:
raise TypeError(f"Unexpected message type: {type(message).__name__}")
OPTION_TO_PARAM = {
RequestsMessageKind.REQUEST: {
'headers': OUT_REQ_HEAD,
'body': OUT_REQ_BODY,
},
RequestsMessageKind.RESPONSE: {
'headers': OUT_RESP_HEAD,
'body': OUT_RESP_BODY,
'meta': OUT_RESP_META
}
}
class OutputOptions(NamedTuple):
kind: RequestsMessageKind
headers: bool
body: bool
meta: bool = False
def any(self):
return (
self.headers
or self.body
or self.meta
)
@classmethod
def from_message(
cls,
message: RequestsMessage,
raw_args: str = '',
**kwargs
):
kind = infer_requests_message_kind(message)
options = {
option: param in raw_args
for option, param in OPTION_TO_PARAM[kind].items()
}
options.update(kwargs)
return cls(
kind=kind,
**options
)