You've already forked httpie-cli
mirror of
https://github.com/httpie/cli.git
synced 2025-08-10 22:42:05 +02:00
Implement support for multiple headers with the same name in sessions (#1335)
* Properly remove duplicate Cookie headers * Implement support for multiple headers with the same name in sessions * More testing * Cleanup * Remove duplicated test, cleanup * Fix pycodestyle * CHANGELOG Co-authored-by: Jakub Roztocil <jakub@roztocil.co>
This commit is contained in:
@@ -35,6 +35,16 @@ class HTTPHeadersDict(CIMultiDict, BaseMultiDict):
|
||||
|
||||
super().add(key, value)
|
||||
|
||||
def remove_item(self, key, value):
|
||||
"""
|
||||
Remove a (key, value) pair from the dict.
|
||||
"""
|
||||
existing_values = self.popall(key)
|
||||
existing_values.remove(value)
|
||||
|
||||
for value in existing_values:
|
||||
self.add(key, value)
|
||||
|
||||
|
||||
class RequestJSONDataDict(OrderedDict):
|
||||
pass
|
||||
|
@@ -4,6 +4,7 @@ from typing import Any, Type, List, Dict, TYPE_CHECKING
|
||||
if TYPE_CHECKING:
|
||||
from httpie.sessions import Session
|
||||
|
||||
|
||||
INSECURE_COOKIE_JAR_WARNING = '''\
|
||||
Outdated layout detected for the current session. Please consider updating it,
|
||||
in order to not get affected by potential security problems.
|
||||
@@ -53,16 +54,12 @@ def pre_process(session: 'Session', cookies: Any) -> List[Dict[str, Any]]:
|
||||
for cookie in normalized_cookies
|
||||
)
|
||||
|
||||
if should_issue_warning and not session.refactor_mode:
|
||||
if should_issue_warning:
|
||||
warning = INSECURE_COOKIE_JAR_WARNING.format(hostname=session.bound_host, session_id=session.session_id)
|
||||
if not session.is_anonymous:
|
||||
warning += INSECURE_COOKIE_JAR_WARNING_FOR_NAMED_SESSIONS
|
||||
warning += INSECURE_COOKIE_SECURITY_LINK
|
||||
|
||||
session.env.log_error(
|
||||
warning,
|
||||
level='warning'
|
||||
)
|
||||
session.warn_legacy_usage(warning)
|
||||
|
||||
return normalized_cookies
|
||||
|
73
httpie/legacy/v3_2_0_session_header_format.py
Normal file
73
httpie/legacy/v3_2_0_session_header_format.py
Normal file
@@ -0,0 +1,73 @@
|
||||
from typing import Any, Type, List, Dict, TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from httpie.sessions import Session
|
||||
|
||||
|
||||
OLD_HEADER_STORE_WARNING = '''\
|
||||
Outdated layout detected for the current session. Please consider updating it,
|
||||
in order to use the latest features regarding the header layout.
|
||||
|
||||
For fixing the current session:
|
||||
|
||||
$ httpie cli sessions upgrade {hostname} {session_id}
|
||||
'''
|
||||
|
||||
OLD_HEADER_STORE_WARNING_FOR_NAMED_SESSIONS = '''\
|
||||
|
||||
For fixing all named sessions:
|
||||
|
||||
$ httpie cli sessions upgrade-all
|
||||
'''
|
||||
|
||||
OLD_HEADER_STORE_LINK = '\nSee $INSERT_LINK for more information.'
|
||||
|
||||
|
||||
def pre_process(session: 'Session', headers: Any) -> List[Dict[str, Any]]:
|
||||
"""Serialize the headers into a unified form and issue a warning if
|
||||
the session file is using the old layout."""
|
||||
|
||||
is_old_style = isinstance(headers, dict)
|
||||
if is_old_style:
|
||||
normalized_headers = list(headers.items())
|
||||
else:
|
||||
normalized_headers = [
|
||||
(item['name'], item['value'])
|
||||
for item in headers
|
||||
]
|
||||
|
||||
if is_old_style:
|
||||
warning = OLD_HEADER_STORE_WARNING.format(hostname=session.bound_host, session_id=session.session_id)
|
||||
if not session.is_anonymous:
|
||||
warning += OLD_HEADER_STORE_WARNING_FOR_NAMED_SESSIONS
|
||||
warning += OLD_HEADER_STORE_LINK
|
||||
session.warn_legacy_usage(warning)
|
||||
|
||||
return normalized_headers
|
||||
|
||||
|
||||
def post_process(
|
||||
normalized_headers: List[Dict[str, Any]],
|
||||
*,
|
||||
original_type: Type[Any]
|
||||
) -> Any:
|
||||
"""Deserialize given header store into the original form it was
|
||||
used in."""
|
||||
|
||||
if issubclass(original_type, dict):
|
||||
# For the legacy behavior, preserve the last value.
|
||||
return {
|
||||
item['name']: item['value']
|
||||
for item in normalized_headers
|
||||
}
|
||||
else:
|
||||
return normalized_headers
|
||||
|
||||
|
||||
def fix_layout(session: 'Session', *args, **kwargs) -> None:
|
||||
from httpie.sessions import materialize_headers
|
||||
|
||||
if not isinstance(session['headers'], dict):
|
||||
return None
|
||||
|
||||
session['headers'] = materialize_headers(session['headers'])
|
@@ -4,10 +4,16 @@ from typing import Tuple
|
||||
from httpie.sessions import SESSIONS_DIR_NAME, get_httpie_session
|
||||
from httpie.status import ExitStatus
|
||||
from httpie.context import Environment
|
||||
from httpie.legacy import cookie_format as legacy_cookies
|
||||
from httpie.legacy import v3_1_0_session_cookie_format, v3_2_0_session_header_format
|
||||
from httpie.manager.cli import missing_subcommand, parser
|
||||
|
||||
|
||||
FIXERS_TO_VERSIONS = {
|
||||
'3.1.0': v3_1_0_session_cookie_format.fix_layout,
|
||||
'3.2.0': v3_2_0_session_header_format.fix_layout,
|
||||
}
|
||||
|
||||
|
||||
def cli_sessions(env: Environment, args: argparse.Namespace) -> ExitStatus:
|
||||
action = args.cli_sessions_action
|
||||
if action is None:
|
||||
@@ -22,7 +28,7 @@ def cli_sessions(env: Environment, args: argparse.Namespace) -> ExitStatus:
|
||||
|
||||
|
||||
def is_version_greater(version_1: str, version_2: str) -> bool:
|
||||
# In an ideal scenerio, we would depend on `packaging` in order
|
||||
# In an ideal scenario, we would depend on `packaging` in order
|
||||
# to offer PEP 440 compatible parsing. But since it might not be
|
||||
# commonly available for outside packages, and since we are only
|
||||
# going to parse HTTPie's own version it should be fine to compare
|
||||
@@ -40,11 +46,6 @@ def is_version_greater(version_1: str, version_2: str) -> bool:
|
||||
return split_version(version_1) > split_version(version_2)
|
||||
|
||||
|
||||
FIXERS_TO_VERSIONS = {
|
||||
'3.1.0': legacy_cookies.fix_layout
|
||||
}
|
||||
|
||||
|
||||
def upgrade_session(env: Environment, args: argparse.Namespace, hostname: str, session_name: str):
|
||||
session = get_httpie_session(
|
||||
env=env,
|
||||
@@ -52,7 +53,7 @@ def upgrade_session(env: Environment, args: argparse.Namespace, hostname: str, s
|
||||
session_name=session_name,
|
||||
host=hostname,
|
||||
url=hostname,
|
||||
refactor_mode=True
|
||||
suppress_legacy_warnings=True
|
||||
)
|
||||
|
||||
session_name = session.path.stem
|
||||
|
@@ -13,12 +13,16 @@ from typing import Any, Dict, List, Optional, Union
|
||||
from requests.auth import AuthBase
|
||||
from requests.cookies import RequestsCookieJar, remove_cookie_by_name
|
||||
|
||||
from .context import Environment
|
||||
from .context import Environment, Levels
|
||||
from .cli.dicts import HTTPHeadersDict
|
||||
from .config import BaseConfigDict, DEFAULT_CONFIG_DIR
|
||||
from .utils import url_as_host
|
||||
from .plugins.registry import plugin_manager
|
||||
from .legacy import cookie_format as legacy_cookies
|
||||
|
||||
from .legacy import (
|
||||
v3_1_0_session_cookie_format as legacy_cookies,
|
||||
v3_2_0_session_header_format as legacy_headers
|
||||
)
|
||||
|
||||
|
||||
SESSIONS_DIR_NAME = 'sessions'
|
||||
@@ -67,6 +71,23 @@ def materialize_cookie(cookie: Cookie) -> Dict[str, Any]:
|
||||
return materialized_cookie
|
||||
|
||||
|
||||
def materialize_cookies(jar: RequestsCookieJar) -> List[Dict[str, Any]]:
|
||||
return [
|
||||
materialize_cookie(cookie)
|
||||
for cookie in jar
|
||||
]
|
||||
|
||||
|
||||
def materialize_headers(headers: Dict[str, str]) -> List[Dict[str, Any]]:
|
||||
return [
|
||||
{
|
||||
'name': name,
|
||||
'value': value
|
||||
}
|
||||
for name, value in headers.copy().items()
|
||||
]
|
||||
|
||||
|
||||
def get_httpie_session(
|
||||
env: Environment,
|
||||
config_dir: Path,
|
||||
@@ -74,7 +95,7 @@ def get_httpie_session(
|
||||
host: Optional[str],
|
||||
url: str,
|
||||
*,
|
||||
refactor_mode: bool = False
|
||||
suppress_legacy_warnings: bool = False
|
||||
) -> 'Session':
|
||||
bound_hostname = host or url_as_host(url)
|
||||
if not bound_hostname:
|
||||
@@ -93,7 +114,7 @@ def get_httpie_session(
|
||||
env=env,
|
||||
session_id=session_id,
|
||||
bound_host=strip_port(bound_hostname),
|
||||
refactor_mode=refactor_mode
|
||||
suppress_legacy_warnings=suppress_legacy_warnings
|
||||
)
|
||||
session.load()
|
||||
return session
|
||||
@@ -109,30 +130,29 @@ class Session(BaseConfigDict):
|
||||
env: Environment,
|
||||
bound_host: str,
|
||||
session_id: str,
|
||||
refactor_mode: bool = False,
|
||||
suppress_legacy_warnings: bool = False,
|
||||
):
|
||||
super().__init__(path=Path(path))
|
||||
self['headers'] = {}
|
||||
|
||||
# Default values for the session files
|
||||
self['headers'] = []
|
||||
self['cookies'] = []
|
||||
self['auth'] = {
|
||||
'type': None,
|
||||
'username': None,
|
||||
'password': None
|
||||
}
|
||||
|
||||
# Runtime state of the Session objects.
|
||||
self.env = env
|
||||
self._headers = HTTPHeadersDict()
|
||||
self.cookie_jar = RequestsCookieJar()
|
||||
self.session_id = session_id
|
||||
self.bound_host = bound_host
|
||||
self.refactor_mode = refactor_mode
|
||||
self.suppress_legacy_warnings = suppress_legacy_warnings
|
||||
|
||||
def pre_process_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
cookies = data.get('cookies')
|
||||
if cookies:
|
||||
normalized_cookies = legacy_cookies.pre_process(self, cookies)
|
||||
else:
|
||||
normalized_cookies = []
|
||||
|
||||
for cookie in normalized_cookies:
|
||||
def _add_cookies(self, cookies: List[Dict[str, Any]]) -> None:
|
||||
for cookie in cookies:
|
||||
domain = cookie.get('domain', '')
|
||||
if domain is None:
|
||||
# domain = None means explicitly lack of cookie, though
|
||||
@@ -143,29 +163,38 @@ class Session(BaseConfigDict):
|
||||
|
||||
self.cookie_jar.set(**cookie)
|
||||
|
||||
def pre_process_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
for key, deserializer, importer in [
|
||||
('cookies', legacy_cookies.pre_process, self._add_cookies),
|
||||
('headers', legacy_headers.pre_process, self._headers.update),
|
||||
]:
|
||||
values = data.get(key)
|
||||
if values:
|
||||
normalized_values = deserializer(self, values)
|
||||
else:
|
||||
normalized_values = []
|
||||
|
||||
importer(normalized_values)
|
||||
|
||||
return data
|
||||
|
||||
def post_process_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
cookies = data.get('cookies')
|
||||
for key, store, serializer, exporter in [
|
||||
('cookies', self.cookie_jar, materialize_cookies, legacy_cookies.post_process),
|
||||
('headers', self._headers, materialize_headers, legacy_headers.post_process),
|
||||
]:
|
||||
original_type = type(data.get(key))
|
||||
values = serializer(store)
|
||||
|
||||
normalized_cookies = [
|
||||
materialize_cookie(cookie)
|
||||
for cookie in self.cookie_jar
|
||||
]
|
||||
data['cookies'] = legacy_cookies.post_process(
|
||||
normalized_cookies,
|
||||
original_type=type(cookies)
|
||||
)
|
||||
data[key] = exporter(
|
||||
values,
|
||||
original_type=original_type
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
def update_headers(self, request_headers: HTTPHeadersDict):
|
||||
"""
|
||||
Update the session headers with the request ones while ignoring
|
||||
certain name prefixes.
|
||||
|
||||
"""
|
||||
headers = self.headers
|
||||
def _compute_new_headers(self, request_headers: HTTPHeadersDict) -> HTTPHeadersDict:
|
||||
new_headers = HTTPHeadersDict()
|
||||
for name, value in request_headers.copy().items():
|
||||
if value is None:
|
||||
continue # Ignore explicitly unset headers
|
||||
@@ -183,24 +212,40 @@ class Session(BaseConfigDict):
|
||||
morsel['path'] = DEFAULT_COOKIE_PATH
|
||||
self.cookie_jar.set(cookie_name, morsel)
|
||||
|
||||
all_cookie_headers = request_headers.getall(name)
|
||||
if len(all_cookie_headers) > 1:
|
||||
all_cookie_headers.remove(original_value)
|
||||
else:
|
||||
request_headers.popall(name)
|
||||
request_headers.remove_item(name, original_value)
|
||||
continue
|
||||
|
||||
for prefix in SESSION_IGNORED_HEADER_PREFIXES:
|
||||
if name.lower().startswith(prefix.lower()):
|
||||
break
|
||||
else:
|
||||
headers[name] = value
|
||||
new_headers.add(name, value)
|
||||
|
||||
self['headers'] = dict(headers)
|
||||
return new_headers
|
||||
|
||||
def update_headers(self, request_headers: HTTPHeadersDict):
|
||||
"""
|
||||
Update the session headers with the request ones while ignoring
|
||||
certain name prefixes.
|
||||
|
||||
"""
|
||||
|
||||
new_headers = self._compute_new_headers(request_headers)
|
||||
new_keys = new_headers.copy().keys()
|
||||
|
||||
# New headers will take priority over the existing ones, and override
|
||||
# them directly instead of extending them.
|
||||
for key, value in self._headers.copy().items():
|
||||
if key in new_keys:
|
||||
continue
|
||||
|
||||
new_headers.add(key, value)
|
||||
|
||||
self._headers = new_headers
|
||||
|
||||
@property
|
||||
def headers(self) -> HTTPHeadersDict:
|
||||
return HTTPHeadersDict(self['headers'])
|
||||
return self._headers.copy()
|
||||
|
||||
@property
|
||||
def cookies(self) -> RequestsCookieJar:
|
||||
@@ -257,3 +302,17 @@ class Session(BaseConfigDict):
|
||||
@property
|
||||
def is_anonymous(self):
|
||||
return is_anonymous_session(self.session_id)
|
||||
|
||||
def warn_legacy_usage(self, warning: str) -> None:
|
||||
if self.suppress_legacy_warnings:
|
||||
return None
|
||||
|
||||
self.env.log_error(
|
||||
warning,
|
||||
level=Levels.WARNING
|
||||
)
|
||||
|
||||
# We don't want to spam multiple warnings on each usage,
|
||||
# so if there is already a warning for the legacy usage
|
||||
# we'll skip the next ones.
|
||||
self.suppress_legacy_warnings = True
|
||||
|
Reference in New Issue
Block a user