1
0
mirror of https://github.com/kellyjonbrazil/jc.git synced 2025-07-13 01:20:24 +02:00

swapon parser (#383) (#489)

* swapon parser

* revert lib

* fix lib

* Added tests

* Fix tests

---------

Co-authored-by: Kelly Brazil <kellyjonbrazil@gmail.com>
This commit is contained in:
Roey Darwish Dror
2023-11-28 21:15:40 +02:00
committed by GitHub
parent f44260603e
commit 3de6eac1ad
8 changed files with 226 additions and 0 deletions

1
.gitignore vendored
View File

@ -6,3 +6,4 @@ build/
.github/ .github/
.vscode/ .vscode/
_config.yml _config.yml
.venv

View File

@ -177,6 +177,7 @@ parsers: List[str] = [
'sshd-conf', 'sshd-conf',
'stat', 'stat',
'stat-s', 'stat-s',
'swapon',
'sysctl', 'sysctl',
'syslog', 'syslog',
'syslog-s', 'syslog-s',

176
jc/parsers/swapon.py Normal file
View File

@ -0,0 +1,176 @@
"""jc - JSON Convert `swapon` command output parser
> Note: Must use `swapon`
Usage (cli):
$ swapon | jc --swapon
or
$ jc swapon
Usage (module):
import jc
result = jc.parse('swapon', uname_command_output)
Schema:
[
{
"name": string,
"type": string,
"size": int,
"used": int,
"priority": int,
}
]
Example:
$ swapon | jc --swapon
[
{
"name": "/swapfile",
"type": "file",
"size": 1073741824,
"used": 0,
"priority": -2
}
]
"""
from enum import Enum
from jc.exceptions import ParseError
import jc.utils
from typing import List, Dict, Union
class info:
"""Provides parser metadata (version, author, etc.)"""
version = "1.0"
description = "`swapon` command parser"
author = "Roey Darwish Dror"
author_email = "roey.ghost@gmail.com"
compatible = ["linux", "freebsd"]
magic_commands = ["swapon"]
tags = ["command"]
__version__ = info.version
_Value = Union[str, int]
_Entry = Dict[str, _Value]
class _Column(Enum):
NAME = "name"
TYPE = "type"
SIZE = "size"
USED = "used"
PRIO = "priority"
LABEL = "label"
UUID = "uuid"
@classmethod
def from_header(cls, header: str) -> "_Column":
if (header == "NAME") or (header == "Filename"):
return cls.NAME
elif (header == "TYPE") or (header == "Type"):
return cls.TYPE
elif (header == "SIZE") or (header == "Size"):
return cls.SIZE
elif (header == "USED") or (header == "Used"):
return cls.USED
elif (header == "PRIO") or (header == "Priority"):
return cls.PRIO
elif header == "LABEL":
return cls.LABEL
elif header == "UUID":
return cls.UUID
else:
raise ParseError(f"Unknown header: {header}")
def _parse_size(size: str) -> int:
power = None
if size[-1] == "B":
power = 0
if size[-1] == "K":
power = 1
elif size[-1] == "M":
power = 2
elif size[-1] == "G":
power = 3
elif size[-1] == "T":
power = 4
multiplier = 1024**power if power is not None else 1024
return (int(size[:-1]) if power is not None else int(size)) * multiplier
def _value(value: str, column: _Column) -> _Value:
if column == _Column.SIZE or column == _Column.USED:
return _parse_size(value)
elif column == _Column.PRIO:
return int(value)
else:
return value
def _process(proc_data: List[Dict]) -> List[Dict]:
"""
Final processing to conform to the schema.
Parameters:
proc_data: (List of Dictionaries) raw structured data to process
Returns:
List of Dictionaries. Structured to conform to the schema.
"""
return proc_data
def parse(data: str, raw: bool = False, quiet: bool = False) -> List[_Entry]:
"""
Main text parsing function
Parameters:
data: (string) text data to parse
raw: (boolean) unprocessed output if True
quiet: (boolean) suppress warning messages if True
Returns:
Dictionary. Raw or processed structured data.
"""
jc.utils.compatibility(__name__, info.compatible, quiet)
jc.utils.input_type_check(data)
raw_output: List[dict] = []
if jc.utils.has_data(data):
lines = iter(data.splitlines())
headers = next(lines)
columns = headers.split()
for line in lines:
line = line.split()
diff = len(columns) - len(line)
if not 0 <= diff <= 2:
raise ParseError(
f"Number of columns ({len(line)}) in line does not match number of headers ({len(columns)})"
)
document: _Entry = {}
for column, value in zip(columns, line):
column = _Column.from_header(column)
document[column.value] = _value(value, column)
raw_output.append(document)
return raw_output if raw else _process(raw_output)

View File

@ -0,0 +1 @@
[{"name":"/swap.img","type":"file","size":2147483648,"used":2097152,"priority":-2,"uuid":"0918d27e-3907-471d-abb8-45fa49ae059c"}]

View File

@ -0,0 +1,2 @@
NAME TYPE SIZE USED PRIO UUID LABEL
/swap.img file 2G 2M -2 0918d27e-3907-471d-abb8-45fa49ae059c

View File

@ -0,0 +1 @@
[{"name":"/swapfile","type":"file","size":1073741824,"used":524288,"priority":-2}]

View File

@ -0,0 +1,2 @@
NAME TYPE SIZE USED PRIO UUID LABEL
/swapfile file 1024M 512K -2

42
tests/test_swapon.py Normal file
View File

@ -0,0 +1,42 @@
import os
import unittest
import json
from typing import Dict
from jc.parsers.swapon import parse
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
class Swapon(unittest.TestCase):
f_in: Dict = {}
f_json: Dict = {}
@classmethod
def setUpClass(cls):
fixtures = {
"swapon_all": ("fixtures/generic/swapon-all-v1.out", "fixtures/generic/swapon-all-v1.json"),
"swapon_all_v2": ("fixtures/generic/swapon-all-v2.out", "fixtures/generic/swapon-all-v2.json"),
}
for file, filepaths in fixtures.items():
with open(os.path.join(THIS_DIR, filepaths[0]), "r", encoding="utf-8") as a, open(
os.path.join(THIS_DIR, filepaths[1]), "r", encoding="utf-8"
) as b:
cls.f_in[file] = a.read()
cls.f_json[file] = json.loads(b.read())
def test_swapon_all(self):
"""
Test 'swapon --output-all'
"""
self.assertEqual(parse(self.f_in["swapon_all"], quiet=True), self.f_json["swapon_all"])
def test_swapon_all_v2(self):
"""
Test 'swapon --output-all'
"""
self.assertEqual(parse(self.f_in["swapon_all_v2"], quiet=True), self.f_json["swapon_all_v2"])
if __name__ == "__main__":
unittest.main()