1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-06-17 00:07:37 +02:00

add http_headers and curl_head parsers

This commit is contained in:
Kelly Brazil
2024-02-04 11:17:01 -08:00
parent 29689f7603
commit 39f43aad63
3 changed files with 731 additions and 0 deletions

View File

@ -33,6 +33,7 @@ parsers: List[str] = [
'crontab-u',
'csv',
'csv-s',
'curl-head',
'date',
'datetime-iso',
'debconf-show',
@ -62,6 +63,7 @@ parsers: List[str] = [
'history',
'host',
'hosts',
'http-headers',
'id',
'ifconfig',
'ini',

285
jc/parsers/curl_head.py Normal file
View File

@ -0,0 +1,285 @@
"""jc - JSON Convert `curl --head` command output parser
This parser converts standard and verbose `curl --head` output.
When converting verbose output from `curl` (to gather request headers, for
example) you will need to redirect STDERR to STDOUT with `2>&1`. The magic
syntax will not work in this case.
Usage (cli):
$ curl --head www.example.com | jc --curl-head
$ curl -Ivs www.example.com 2>&1 | jc --curl-head
or
$ jc curl --head www.example.com
Usage (module):
import jc
result = jc.parse('curl_head', curl_head_command_output)
Schema:
[
{
"<header>": string,
"accept": [
string
],
"accept-ch": [
string
],
"accept-ch-lifetime": integer,
"accept-encoding": [
string
],
"accept-language": [
string
],
"accept-patch": [
string
],
"accept-post": [
string
],
"accept-ranges": [
string
],
"access-control-allow-headers": [
string
],
"access-control-allow-methods": [
string
],
"access-control-expose-headers": [
string
],
"access-control-max-age": integer,
"access-control-request-headers": [
string
],
"age": integer,
"allow": [
string
],
"alt-svc": [
string
],
"cache-control": [
string
],
"clear-site-data": [
string
],
"connection": [
string
],
"content-encoding": [
string
],
"content-dpr": integer,
"content-language": [
string
],
"content-length": integer,
"content-security-policy": [
string
],
"content-security-policy-report-only": [
string
],
"cookie": [
string
],
"critical-ch": [
string
],
"date": string,
"date_epoch_utc": integer,
"expect-ct": [
string
],
"expires": string,
"expires_epoch_utc": integer,
"device-memory": integer,
"downlink": integer,
"dpr": integer,
"forwarded": [
string
],
"if-match": [
string
],
"if-modified-since": string,
"if-modified-since_epoch_utc": integer,
"if-none-match": [
string
],
"if-range": string,
"if-range_epoch_utc": integer,
"if-unmodified-since": string,
"if-unmodified-since_epoch_utc": integer,
"im": [
string
],
"keep-alive": [
string
],
"large-allocation": integer,
"last-modified": string,
"last-modified_epoch_utc": integer,
"link": [
string
],
"max-forwards": integer,
"memento-datetime": string,
"memento-datetime_epoch_utc": integer,
"permissions-policy": [
string
],
"pragma": [
string
],
"proxy-authenticate": [
string
],
"retry-after": string,
"retry-after_epoch_utc": integer,
"rtt": integer,
"sec-ch-ua": [
string
],
"sec-ch-ua-full-version-list": [
string
],
"server": [
string
],
"server-timing": [
string
],
"set-cookie": [
string
],
"timing-allow-origin": [
string
],
"trailer": [
string
],
"transfer-encoding": [
string
],
"upgrade": [
string
],
"upgrade-insecure-requests": integer,
"vary": [
string
],
"via": [
string
],
"warning": [
string
],
"www-authenticate": [
string
],
"x-cache-hits": [
integer
],
"x-content-duration": float
}
]
Examples:
$ curl-head | jc --curl-head -p
[]
$ curl-head | jc --curl-head -p -r
[]
"""
from typing import List, Dict
from jc.jc_types import JSONDictType
import jc.utils
import jc.parsers.http_headers as headers_parser
class info():
"""Provides parser metadata (version, author, etc.)"""
version = '1.0'
description = '`curl --head` command parser'
author = 'Kelly Brazil'
author_email = 'kellyjonbrazil@gmail.com'
details = 'Using the http-headers parser.'
compatible = ['linux', 'darwin', 'cygwin', 'win32', 'aix', 'freebsd']
tags = ['command', 'standard']
magic_commands = ['curl']
__version__ = info.version
def _remove_extra_chars(data: str, verbose: bool) -> str:
if data.startswith('> ') or data.startswith('< '):
return data[2:]
elif data.startswith('* '):
return ''
elif verbose:
return ''
else:
return data
def _process(proc_data: List[JSONDictType]) -> List[JSONDictType]:
"""
Final processing to conform to the schema.
Parameters:
proc_data: (List of Dictionaries) raw structured data to process
Returns:
List of Dictionaries. Structured to conform to the schema.
"""
return proc_data
def parse(
data: str,
raw: bool = False,
quiet: bool = False
) -> List[JSONDictType]:
"""
Main text parsing function
Parameters:
data: (string) text data to parse
raw: (boolean) unprocessed output if True
quiet: (boolean) suppress warning messages if True
Returns:
List of Dictionaries. Raw or processed structured data.
"""
jc.utils.compatibility(__name__, info.compatible, quiet)
jc.utils.input_type_check(data)
raw_output: List[Dict] = []
curl_verbose = False
if jc.utils.has_data(data):
data_list = data.splitlines()
if data_list[0].startswith('* '):
curl_verbose = True
data_list = [_remove_extra_chars(x, verbose=curl_verbose) for x in data_list]
data_str = '\n'.join(data_list)
raw_output = headers_parser.parse(data_str, raw, quiet)
return raw_output if raw else _process(raw_output)

444
jc/parsers/http_headers.py Normal file
View File

@ -0,0 +1,444 @@
"""jc - JSON Convert HTTP headers parser
Converts HTTP request and response headers into a list of dictionaries.
Well-known headers are processed to allow multiple instances which are
aggregated into an array along with any comma-separated values. Integer,
float, and datetimes are converted where defined in the specifications.
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
https://datatracker.ietf.org/doc/html/rfc2616
https://datatracker.ietf.org/doc/html/rfc3229
https://datatracker.ietf.org/doc/html/rfc7089
https://datatracker.ietf.org/doc/html/rfc7231
https://www.rfc-editor.org/rfc/rfc5789
If you are converting HTTP headers from `curl` verbose output, use the
`curl-head` parser which will strip the `>` and `<` characters and remove
non-header lines that begin with `*`.
Usage (cli):
$ cat headers.txt | jc --http-headers
Usage (module):
import jc
result = jc.parse('http_headers', http_headers_output)
Schema:
[
{
"<header>": string,
"accept": [
string
],
"accept-ch": [
string
],
"accept-ch-lifetime": integer,
"accept-encoding": [
string
],
"accept-language": [
string
],
"accept-patch": [
string
],
"accept-post": [
string
],
"accept-ranges": [
string
],
"access-control-allow-headers": [
string
],
"access-control-allow-methods": [
string
],
"access-control-expose-headers": [
string
],
"access-control-max-age": integer,
"access-control-request-headers": [
string
],
"age": integer,
"allow": [
string
],
"alt-svc": [
string
],
"cache-control": [
string
],
"clear-site-data": [
string
],
"connection": [
string
],
"content-encoding": [
string
],
"content-dpr": integer,
"content-language": [
string
],
"content-length": integer,
"content-security-policy": [
string
],
"content-security-policy-report-only": [
string
],
"cookie": [
string
],
"critical-ch": [
string
],
"date": string,
"date_epoch_utc": integer,
"expect-ct": [
string
],
"expires": string,
"expires_epoch_utc": integer,
"device-memory": integer,
"downlink": integer,
"dpr": integer,
"forwarded": [
string
],
"if-match": [
string
],
"if-modified-since": string,
"if-modified-since_epoch_utc": integer,
"if-none-match": [
string
],
"if-range": string,
"if-range_epoch_utc": integer,
"if-unmodified-since": string,
"if-unmodified-since_epoch_utc": integer,
"im": [
string
],
"keep-alive": [
string
],
"large-allocation": integer,
"last-modified": string,
"last-modified_epoch_utc": integer,
"link": [
string
],
"max-forwards": integer,
"memento-datetime": string,
"memento-datetime_epoch_utc": integer,
"permissions-policy": [
string
],
"pragma": [
string
],
"proxy-authenticate": [
string
],
"retry-after": string,
"retry-after_epoch_utc": integer,
"rtt": integer,
"sec-ch-ua": [
string
],
"sec-ch-ua-full-version-list": [
string
],
"server": [
string
],
"server-timing": [
string
],
"set-cookie": [
string
],
"timing-allow-origin": [
string
],
"trailer": [
string
],
"transfer-encoding": [
string
],
"upgrade": [
string
],
"upgrade-insecure-requests": integer,
"vary": [
string
],
"via": [
string
],
"warning": [
string
],
"www-authenticate": [
string
],
"x-cache-hits": [
integer
],
"x-content-duration": float
}
]
Examples:
$ curl-head | jc --curl-head -p
[]
$ curl-head | jc --curl-head -p -r
[]
"""
from typing import List, Dict
from jc.jc_types import JSONDictType
import jc.utils
class info():
"""Provides parser metadata (version, author, etc.)"""
version = '1.0'
description = 'HTTP headers parser'
author = 'Kelly Brazil'
author_email = 'kellyjonbrazil@gmail.com'
compatible = ['linux', 'darwin', 'cygwin', 'win32', 'aix', 'freebsd']
tags = ['standard', 'file']
__version__ = info.version
METHODS = {'connect', 'delete', 'get', 'head', 'options', 'patch', 'post', 'put', 'trace'}
INT_HEADERS = {
'accept-ch-lifetime',
'access-control-max-age',
'age',
'content-dpr',
'content-length',
'device-memory',
'downlink',
'dpr',
'large-allocation',
'max-forwards',
'rtt',
'upgrade-insecure-requests'
}
FLOAT_HEADERS = {
'x-content-duration'
}
DT_HEADERS = {
'date',
'if-modified-since',
'if-unmodified-since',
'last-modified',
'memento-datetime'
}
DT_OR_INT_HEADERS = {
'expires',
'retry-after'
}
DT_OR_STR_HEADERS = {
'if-range'
}
MULTI_HEADERS = {
'content-security-policy',
'content-security-policy-report-only',
'cookie',
'set-cookie'
}
SPLIT_AND_MULTI_HEADERS = {
'accept',
'accept-ch',
'accept-encoding',
'accept-language',
'accept-patch',
'accept-post',
'accept-ranges',
'access-control-allow-headers',
'access-control-allow-methods',
'access-control-expose-headers',
'access-control-request-headers',
'allow',
'alt-svc',
'cache-control',
'clear-site-data',
'connection',
'content-encoding',
'content-language',
'critical-ch',
'expect-ct',
'forwarded',
'if-match',
'if-none-match',
'im',
'keep-alive',
'link',
'permissions-policy',
'pragma',
'proxy-authenticate',
'sec-ch-ua',
'sec-ch-ua-full-version-list',
'server',
'server-timing',
'timing-allow-origin',
'trailer',
'transfer-encoding',
'upgrade',
'vary',
'via',
'warning',
'www-authenticate',
'x-cache-hits'
}
def _process(proc_data: List[JSONDictType]) -> List[JSONDictType]:
"""
Final processing to conform to the schema.
Parameters:
proc_data: (List of Dictionaries) raw structured data to process
Returns:
List of Dictionaries. Structured to conform to the schema.
"""
for item in proc_data:
for key in item.copy():
if key in INT_HEADERS:
item[key] = jc.utils.convert_to_int(item[key])
if key in FLOAT_HEADERS:
item[key] = jc.utils.convert_to_float(item[key])
if key in DT_HEADERS or key in DT_OR_STR_HEADERS:
item[key + '_epoch_utc'] = jc.utils.timestamp(item[key], format_hint=(3500,)).utc
if key in DT_OR_INT_HEADERS:
timestamp = jc.utils.timestamp(item[key], format_hint=(3500,)).utc
int_val = jc.utils.convert_to_int(item[key])
if timestamp:
item[key + '_epoch_utc'] = timestamp
if int_val is not None:
item[key] = int_val
# special handling
if 'x-cache-hits' in item:
item['x-cache-hits'] = [jc.utils.convert_to_int(val) for val in item['x-cache-hits']]
return proc_data
def parse(
data: str,
raw: bool = False,
quiet: bool = False
) -> List[JSONDictType]:
"""
Main text parsing function
Parameters:
data: (string) text data to parse
raw: (boolean) unprocessed output if True
quiet: (boolean) suppress warning messages if True
Returns:
List of Dictionaries. Raw or processed structured data.
"""
jc.utils.compatibility(__name__, info.compatible, quiet)
jc.utils.input_type_check(data)
raw_output: List[Dict] = []
output_object: Dict = {}
if jc.utils.has_data(data):
for line in filter(None, data.splitlines()):
first_word = line.split(maxsplit=1)[0]
first_word = first_word.rstrip(':')
first_word = first_word.lower()
if first_word in METHODS:
if output_object:
raw_output.append(output_object)
method, uri, version = line.split(maxsplit=2)
output_object = {}
output_object['type'] = 'request'
output_object['request_method'] = method
output_object['request_uri'] = uri
output_object['request_version'] = version
continue
if first_word.startswith('http/'):
if output_object:
raw_output.append(output_object)
reason = None
version, status, *reason = line.split(maxsplit=2)
output_object = {}
output_object['type'] = 'response'
output_object['response_version'] = version
output_object['response_status'] = int(status)
output_object['response_reason'] = reason or None
continue
if first_word in SPLIT_AND_MULTI_HEADERS:
key, value = line.split(': ', maxsplit=1)
key = key.lower()
value_list = value.split(',')
value_list = [x.strip() for x in value_list]
if key in output_object:
output_object[key].extend(value_list)
else:
output_object[key] = []
output_object[key].extend(value_list)
continue
if first_word in MULTI_HEADERS:
key, value = line.split(': ', maxsplit=1)
key = key.lower()
if key in output_object:
output_object[key].append(value)
else:
output_object[key] = []
output_object[key].append(value)
continue
# All other headers
key, value = line.split(': ', maxsplit=1)
key = key.lower()
output_object[key] = value
if output_object:
raw_output.append(output_object)
return raw_output if raw else _process(raw_output)