1
0
mirror of https://github.com/MarkParker5/STARK.git synced 2025-09-16 09:36:24 +02:00

WIP: started object parser class implementation

This commit is contained in:
Mark Parker
2025-09-03 19:43:58 +02:00
parent 4c8dfad005
commit 44a12ec91b
2 changed files with 106 additions and 76 deletions

View File

@@ -1,8 +1,5 @@
from __future__ import annotations
import logging
logger = logging.getLogger(__name__)
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, Generator, Type, TypeAlias
@@ -15,6 +12,9 @@ if TYPE_CHECKING:
from ..types import Object
ObjectType: TypeAlias = Type[Object]
import logging
logger = logging.getLogger(__name__)
@dataclass
class MatchResult:
@@ -23,13 +23,39 @@ class MatchResult:
end: int
parameters: dict[str, Object | None]
class ParseResult(NamedTuple):
obj: Object
substring: str
class ParseError(Exception):
pass
class PatternParameter(NamedTuple):
class ObjectParser:
async def did_parse(self, obj: Object, from_string: str) -> str:
'''
This method is called after parsing from string and setting parameters found in pattern.
You will very rarely, if ever, need to call this method directly.
Override this method for more complex parsing from string.
Returns:
Minimal substring that is required to parse value.
Raises:
ParseError: if parsing failed.
'''
return from_string
class PatternParameter(NamedTuple): # TODO: dataclass?
name: str
type_name: str
optional: bool
class RegisteredParameterType(NamedTuple):
name: str
type: ObjectType
optional: bool
parser: ObjectParser
class ParameterMatch(NamedTuple):
name: str
@@ -47,14 +73,27 @@ class Pattern:
_origin: str
_parameter_regex: re.Pattern
_parameter_types: dict[str, ObjectType] = {} # static
_parameter_types: dict[str, RegisteredParameterType] = {}
def __init__(self, origin: str):
self._origin = origin
self._parameter_regex = self._get_parameter_regex()
self.parameters = dict(self._get_parameters())
self._parameter_regex = self._get_pattern_parameter_annotation_regex()
self.parameters = dict(self._get_parameters_annotation_from_pattern())
self.compiled = self._compile()
@classmethod
def add_parameter_type(cls, object_type: ObjectType, parser: ObjectParser | None = None):
error_msg = f'Can`t add parameter type "{object_type.__name__}": pattern parameters do not match properties annotated in class'
# TODO: update schema and validation; handle optional parameters; handle short form where type is defined in object
# assert object_type.pattern.parameters.items() <= object_type.__annotations__.items(), error_msg
exists_type = cls._parameter_types.get(object_type.__name__)
assert exists_type is None or exists_type.type == object_type, f'Can`t add parameter type: {object_type.__name__} already exists'
cls._parameter_types[object_type.__name__] = RegisteredParameterType(
name=object_type.__name__,
type=object_type,
parser=parser or ObjectParser()
)
async def match(self, string: str, objects_cache: dict[str, Object] | None = None) -> list[MatchResult]:
if objects_cache is None:
@@ -115,23 +154,25 @@ class Pattern:
new_match.groupdict().items()
))
logger.debug(f'Found parameters: {[(new_match.start(name), name, match_str_groups[name]) for name in sorted(match_str_groups.keys(), key=lambda n: (int(self.parameters[n].type.greedy), new_match.start(n)))]}')
logger.debug(f'Found parameters: {[(new_match.start(name), name, match_str_groups[name]) for name in sorted(match_str_groups.keys(), key=lambda n: (int(self._parameter_types[self.parameters[parameter_name].type_name].type.greedy), new_match.start(n)))]}')
if not match_str_groups:
break # everything's parsed (probably successfully)
# parse next parameter
name = min(
parameter_name = min(
match_str_groups.keys(),
key=lambda name: (
int(self.parameters[name].type.greedy), # parse greedy the last so they don't absorb neighbours
int(self._parameter_types[self.parameters[parameter_name].type_name].type.greedy), # parse greedy the last so they don't absorb neighbours
new_match.start(name) # parse left to right
)
)
raw_param_substr = match_str_groups[name].strip()
parameter_reg_type = self._parameter_types[self.parameters[parameter_name].type_name]
parameter_type = parameter_reg_type.type
raw_param_substr = match_str_groups[parameter_name].strip()
logger.debug(f'Parsing {name} from {raw_param_substr}')
logger.debug(f'Parsing {parameter_name} from {raw_param_substr}')
# try to get object from cache
for cached_parsed_substr, cached_parsed_obj in objects_cache.items():
@@ -147,22 +188,22 @@ class Pattern:
# )
# break
else: # No cache, parse the object
object_type = self.parameters[name].type
try:
object_matches = await object_type.pattern.match(raw_param_substr, objects_cache)
object_matches = await parameter_type.pattern.match(raw_param_substr, objects_cache)
if not object_matches:
raise ParseError(f"Failed to match object {object_type} from {raw_param_substr}")
raise ParseError(f"Failed to match object {parameter_type} from {raw_param_substr}")
object_pattern_match = object_matches[0]
parse_result = await object_type.parse(
parse_result = await self._parse_object(
parameter_reg_type.type,
parameter_reg_type.parser,
from_string=object_pattern_match.substring,
parsed_parameters=object_pattern_match.parameters
)
except ParseError as e:
logger.error(f"Pattern.match ParseError: {e}")
# explicitly set match result with None obj so it won't stuck in an infinite retry loop
parsed_parameters[name] = ParameterMatch(
name=name,
parsed_parameters[parameter_name] = ParameterMatch(
name=parameter_name,
regex_substr=raw_param_substr,
parsed_obj=None,
parsed_substr='',
@@ -170,13 +211,13 @@ class Pattern:
continue
objects_cache[parse_result.substring] = parse_result.obj
parsed_parameters[name] = ParameterMatch(
name=name,
parsed_parameters[parameter_name] = ParameterMatch(
name=parameter_name,
regex_substr=raw_param_substr,
parsed_obj=parse_result.obj,
parsed_substr=parse_result.substring,
)
logger.debug(f"Pattern.match: name={name} raw_param_substr={raw_param_substr} parse_result.substring={parse_result.substring}")
logger.debug(f"Pattern.match: name={parameter_name} raw_param_substr={raw_param_substr} parse_result.substring={parse_result.substring}")
# Validate parsed parameters
@@ -209,34 +250,47 @@ class Pattern:
return sorted(matches, key = lambda m: len(m.substring), reverse = True)
@classmethod
def add_parameter_type(cls, object_type: ObjectType):
error_msg = f'Can`t add parameter type "{object_type.__name__}": pattern parameters do not match properties annotated in class'
# TODO: update schema and validation; handle optional parameters; handle short form where type is defined in object
# assert object_type.pattern.parameters.items() <= object_type.__annotations__.items(), error_msg
exists_type = cls._parameter_types.get(object_type.__name__)
assert exists_type in {object_type, None}, f'Can`t add parameter type: {object_type.__name__} already exists'
cls._parameter_types[object_type.__name__] = object_type
# parsing
# private
async def _parse_object(self, object_type: type[Object], parser: ObjectParser, from_string: str, parsed_parameters: dict[str, Object | None] | None = None) -> ParseResult:
def _get_parameter_regex(self) -> re.Pattern:
obj = object_type(from_string) # temp/default value, may be overridden by did_parse
parsed_parameters = parsed_parameters or {}
assert parsed_parameters.keys() <= {p.name for p in object_type.pattern.parameters.values() if not p.optional}
for name in object_type.pattern.parameters:
value = parsed_parameters[name]
setattr(obj, name, value)
substring = await parser.did_parse(obj, from_string)
substring = await obj.did_parse(substring)
assert substring.strip(), ValueError(f'Parsed substring must not be empty. Object: {obj}, Parser: {parser}')
assert substring in from_string, ValueError(f'Parsed substring must be a part of the original string. There is no {substring} in {from_string}. Object: {obj}, Parser: {parser}')
assert obj.value is not None, ValueError(f'Parsed object {obj} must have a `value` property set in did_parse method. Object: {obj}, Parser: {parser}')
return ParseResult(obj, substring)
# processing pattern
def _get_pattern_parameter_annotation_regex(self) -> re.Pattern:
# types = '|'.join(Pattern._parameter_types.keys())
# return re.compile(r'\$(?P<name>[A-z][A-z0-9]*)\:(?P<type>(?:' + types + r'))')
# do not use types list because it prevents validation of unknown types
return re.compile(r'\$(?P<name>[A-z][A-z0-9]*)\:(?P<type>[A-z][A-z0-9]*)(?P<optional>\?)?')
def _get_parameters(self) -> Generator[tuple[str, PatternParameter], None, None]:
def _get_parameters_annotation_from_pattern(self) -> Generator[tuple[str, PatternParameter], None, None]:
for match in re.finditer(self._parameter_regex, self._origin):
arg_name = match.group('name')
arg_type_name = match.group('type')
arg_optional = bool(match.group('optional'))
arg_type: ObjectType | None = Pattern._parameter_types.get(arg_type_name)
if not arg_type:
reg_type: RegisteredParameterType | None = Pattern._parameter_types.get(arg_type_name)
if not reg_type:
raise NameError(f'Unknown type: "{arg_type_name}" for parameter: "{arg_name}" in pattern: "{self._origin}"')
yield arg_name, PatternParameter(arg_name, arg_type, arg_optional)
yield arg_name, PatternParameter(arg_name, arg_type_name, arg_optional)
def _compile(self, prefill: dict[str, str] | None = None) -> str: # transform Pattern to classic regex with named groups
prefill = prefill or {}
@@ -251,25 +305,28 @@ class Pattern:
# find and transform parameters like $name:Type
for parameter in self.parameters.values():
parameter_type = self._parameter_types[parameter.type_name].type
arg_declaration = f'\\${parameter.name}\\:{parameter.type.__name__}' # NOTE: special chars escaped for regex
arg_declaration = f'\\${parameter.name}\\:{parameter_type.__name__}' # NOTE: special chars escaped for regex
if parameter.name in prefill:
arg_pattern = re.escape(prefill[parameter.name])
else:
arg_pattern = parameter.type.pattern.compiled.replace('\\', r'\\')
else: # replace pattern annotation with compiled regex
arg_pattern = parameter_type.pattern.compiled.replace('\\', r'\\')
if parameter.type.greedy and arg_pattern[-1] in {'*', '+', '}', '?'}:
# compensate greedy did_parse with non-greedy regex, so it won't consume next params during initial regex
arg_pattern += '?'
if pattern.endswith(f'${parameter.name}:{parameter.type.__name__}'): # NOTE: regex chars are NOT escaped
# compensate the last non_greedy regex to allow consuming till the end of the string
# middle greedy params are limited/stretched by neighbor params
arg_pattern += '$'
if parameter_type.greedy and arg_pattern[-1] in {'*', '+', '}', '?'}:
# compensate greedy did_parse with non-greedy regex, so it won't consume next params during initial regex
arg_pattern += '?'
if pattern.endswith(f'${parameter.name}:{parameter_type.__name__}'): # NOTE: regex chars are NOT escaped
# compensate the last non_greedy regex to allow consuming till the end of the string
# middle greedy params are limited/stretched by neighbor params
arg_pattern += '$'
pattern = re.sub(arg_declaration, f'(?P<{parameter.name}>{arg_pattern})', pattern)
return pattern
# dunder methods
def __eq__(self, other: object) -> bool:
if not isinstance(other, Pattern):
raise NotImplementedError(f'Can`t compare Pattern with {type(other)}')

View File

@@ -2,14 +2,12 @@ from __future__ import annotations
import copy
from abc import ABC
from collections import namedtuple
from typing import Any
from stark.general.classproperty import classproperty
from .. import Pattern
ParseResult = namedtuple('ParseResult', ['obj', 'substring'])
class Object[T](ABC):
@@ -38,41 +36,16 @@ class Object[T](ABC):
Override this method for more complex parsing from string.
If you need even more complex setup, for example, a long living object with DI, use define an ObjectParser subclass and pass it's instance to Pattern.add_parameter_type along with the Object's class. If both `did_parse` are defined, the ObjectParser's will be called first.
Returns:
Minimal substring that is required to parse value.
Raises:
ParseError: if parsing failed.
'''
self.value = from_string # type: ignore # default behavior assuming the value is the whole string matched by the pattern
return from_string
@classmethod
async def parse(cls, from_string: str, parsed_parameters: dict[str, Object | None] | None = None) -> ParseResult:
'''
For internal use only.
You will very rarely, if ever, need to override or even call this method.
Override `def did_parse(self, from_string: str) -> str` if you need complex parsing.
Raises:
ParseError: if parsing failed.
'''
obj = cls(None)
parsed_parameters = parsed_parameters or {}
assert parsed_parameters.keys() <= {p.name for p in cls.pattern.parameters.values() if not p.optional}
for name in cls.pattern.parameters:
value = parsed_parameters[name]
setattr(obj, name, value)
substring = await obj.did_parse(from_string)
assert substring.strip(), ValueError('Parsed substring must not be empty.')
assert substring in from_string, ValueError(f'Parsed substring must be a part of the original string. There is no {substring} in {from_string}.')
assert obj.value is not None, ValueError(f'Parsed object {obj} must have a `value` property set in did_parse method. It is not set in {type(obj)}.')
return ParseResult(obj, substring)
def copy(self) -> Object:
return copy.copy(self)