diff --git a/.gitignore b/.gitignore index 3732a1e5..878b8e5c 100755 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ build/ .github/ .vscode/ _config.yml +.venv diff --git a/jc/lib.py b/jc/lib.py index ba057347..cb2240db 100644 --- a/jc/lib.py +++ b/jc/lib.py @@ -177,6 +177,7 @@ parsers: List[str] = [ 'sshd-conf', 'stat', 'stat-s', + 'swapon', 'sysctl', 'syslog', 'syslog-s', diff --git a/jc/parsers/swapon.py b/jc/parsers/swapon.py new file mode 100644 index 00000000..1d2d1c6d --- /dev/null +++ b/jc/parsers/swapon.py @@ -0,0 +1,176 @@ +"""jc - JSON Convert `swapon` command output parser + +> Note: Must use `swapon` + +Usage (cli): + + $ swapon | jc --swapon + +or + + $ jc swapon + +Usage (module): + + import jc + result = jc.parse('swapon', uname_command_output) + +Schema: + + [ + { + "name": string, + "type": string, + "size": int, + "used": int, + "priority": int, + } + ] + +Example: + + $ swapon | jc --swapon + [ + { + "name": "/swapfile", + "type": "file", + "size": 1073741824, + "used": 0, + "priority": -2 + } + ] +""" +from enum import Enum +from jc.exceptions import ParseError +import jc.utils +from typing import List, Dict, Union + + +class info: + """Provides parser metadata (version, author, etc.)""" + + version = "1.0" + description = "`swapon` command parser" + author = "Roey Darwish Dror" + author_email = "roey.ghost@gmail.com" + compatible = ["linux", "freebsd"] + magic_commands = ["swapon"] + tags = ["command"] + + +__version__ = info.version + +_Value = Union[str, int] +_Entry = Dict[str, _Value] + + +class _Column(Enum): + NAME = "name" + TYPE = "type" + SIZE = "size" + USED = "used" + PRIO = "priority" + LABEL = "label" + UUID = "uuid" + + @classmethod + def from_header(cls, header: str) -> "_Column": + if (header == "NAME") or (header == "Filename"): + return cls.NAME + elif (header == "TYPE") or (header == "Type"): + return cls.TYPE + elif (header == "SIZE") or (header == "Size"): + return cls.SIZE + elif (header == "USED") or (header == "Used"): + return cls.USED + elif (header == "PRIO") or (header == "Priority"): + return cls.PRIO + elif header == "LABEL": + return cls.LABEL + elif header == "UUID": + return cls.UUID + else: + raise ParseError(f"Unknown header: {header}") + + +def _parse_size(size: str) -> int: + power = None + if size[-1] == "B": + power = 0 + if size[-1] == "K": + power = 1 + elif size[-1] == "M": + power = 2 + elif size[-1] == "G": + power = 3 + elif size[-1] == "T": + power = 4 + + multiplier = 1024**power if power is not None else 1024 + + return (int(size[:-1]) if power is not None else int(size)) * multiplier + + +def _value(value: str, column: _Column) -> _Value: + if column == _Column.SIZE or column == _Column.USED: + return _parse_size(value) + elif column == _Column.PRIO: + return int(value) + else: + return value + + +def _process(proc_data: List[Dict]) -> List[Dict]: + """ + Final processing to conform to the schema. + + Parameters: + + proc_data: (List of Dictionaries) raw structured data to process + + Returns: + + List of Dictionaries. Structured to conform to the schema. + """ + return proc_data + + +def parse(data: str, raw: bool = False, quiet: bool = False) -> List[_Entry]: + """ + Main text parsing function + + Parameters: + + data: (string) text data to parse + raw: (boolean) unprocessed output if True + quiet: (boolean) suppress warning messages if True + + Returns: + + Dictionary. Raw or processed structured data. + """ + jc.utils.compatibility(__name__, info.compatible, quiet) + jc.utils.input_type_check(data) + + raw_output: List[dict] = [] + + if jc.utils.has_data(data): + lines = iter(data.splitlines()) + headers = next(lines) + columns = headers.split() + for line in lines: + line = line.split() + diff = len(columns) - len(line) + if not 0 <= diff <= 2: + raise ParseError( + f"Number of columns ({len(line)}) in line does not match number of headers ({len(columns)})" + ) + + document: _Entry = {} + for column, value in zip(columns, line): + column = _Column.from_header(column) + document[column.value] = _value(value, column) + + raw_output.append(document) + + return raw_output if raw else _process(raw_output) diff --git a/tests/fixtures/generic/swapon-all-v1.json b/tests/fixtures/generic/swapon-all-v1.json new file mode 100644 index 00000000..2fbe6b72 --- /dev/null +++ b/tests/fixtures/generic/swapon-all-v1.json @@ -0,0 +1 @@ +[{"name":"/swap.img","type":"file","size":2147483648,"used":2097152,"priority":-2,"uuid":"0918d27e-3907-471d-abb8-45fa49ae059c"}] diff --git a/tests/fixtures/generic/swapon-all-v1.out b/tests/fixtures/generic/swapon-all-v1.out new file mode 100644 index 00000000..df9a73ae --- /dev/null +++ b/tests/fixtures/generic/swapon-all-v1.out @@ -0,0 +1,2 @@ +NAME TYPE SIZE USED PRIO UUID LABEL +/swap.img file 2G 2M -2 0918d27e-3907-471d-abb8-45fa49ae059c diff --git a/tests/fixtures/generic/swapon-all-v2.json b/tests/fixtures/generic/swapon-all-v2.json new file mode 100644 index 00000000..fc6f3cbf --- /dev/null +++ b/tests/fixtures/generic/swapon-all-v2.json @@ -0,0 +1 @@ +[{"name":"/swapfile","type":"file","size":1073741824,"used":524288,"priority":-2}] diff --git a/tests/fixtures/generic/swapon-all-v2.out b/tests/fixtures/generic/swapon-all-v2.out new file mode 100644 index 00000000..2c0111da --- /dev/null +++ b/tests/fixtures/generic/swapon-all-v2.out @@ -0,0 +1,2 @@ +NAME TYPE SIZE USED PRIO UUID LABEL +/swapfile file 1024M 512K -2 diff --git a/tests/test_swapon.py b/tests/test_swapon.py new file mode 100644 index 00000000..447f4ee3 --- /dev/null +++ b/tests/test_swapon.py @@ -0,0 +1,42 @@ +import os +import unittest +import json +from typing import Dict +from jc.parsers.swapon import parse + +THIS_DIR = os.path.dirname(os.path.abspath(__file__)) + + +class Swapon(unittest.TestCase): + f_in: Dict = {} + f_json: Dict = {} + + @classmethod + def setUpClass(cls): + fixtures = { + "swapon_all": ("fixtures/generic/swapon-all-v1.out", "fixtures/generic/swapon-all-v1.json"), + "swapon_all_v2": ("fixtures/generic/swapon-all-v2.out", "fixtures/generic/swapon-all-v2.json"), + } + + for file, filepaths in fixtures.items(): + with open(os.path.join(THIS_DIR, filepaths[0]), "r", encoding="utf-8") as a, open( + os.path.join(THIS_DIR, filepaths[1]), "r", encoding="utf-8" + ) as b: + cls.f_in[file] = a.read() + cls.f_json[file] = json.loads(b.read()) + + def test_swapon_all(self): + """ + Test 'swapon --output-all' + """ + self.assertEqual(parse(self.f_in["swapon_all"], quiet=True), self.f_json["swapon_all"]) + + def test_swapon_all_v2(self): + """ + Test 'swapon --output-all' + """ + self.assertEqual(parse(self.f_in["swapon_all_v2"], quiet=True), self.f_json["swapon_all_v2"]) + + +if __name__ == "__main__": + unittest.main()