1
0
mirror of https://github.com/MarkParker5/STARK.git synced 2024-11-18 16:31:47 +02:00

commands manager: json.dumps and get by name

This commit is contained in:
Mark Parker 2023-09-21 20:31:01 +02:00
parent 73a90776ed
commit 7a6d2ff55a
No known key found for this signature in database
GPG Key ID: C10A60786A07A300
6 changed files with 187 additions and 1 deletions

View File

@ -5,6 +5,7 @@ import warnings
from uuid import UUID, uuid4
from enum import auto, Enum
from datetime import datetime
import json
import inspect
from pydantic import BaseModel, Field

View File

@ -2,6 +2,7 @@ from __future__ import annotations
from dataclasses import dataclass
import inspect
from asyncer import create_task_group, SoonValue
import json
from .patterns import Pattern, MatchResult
from .types import Object
@ -22,6 +23,12 @@ class CommandsManager:
def __init__(self, name: str = ''):
self.name = name or 'CommandsManager'
self.commands = []
def get_by_name(self, name: str) -> Command | None:
for command in self.commands:
if command.name in {f'{self.name}.{name}', name}:
return command
return None
async def search(self, string: str, commands: list[Command] | None = None) -> list[SearchResult]:

View File

@ -3,6 +3,7 @@ from typing import Type, Generator, TypeAlias, TYPE_CHECKING
from dataclasses import dataclass
import re
from asyncer import create_task_group, SoonValue
import json
from .expressions import dictionary
if TYPE_CHECKING:

View File

@ -0,0 +1,77 @@
# from typing import _SpecialGenericAlias
import json
import inspect
from stark import Pattern, Command, CommandsManager
class StarkJsonEncoder(json.JSONEncoder):
def default(self, obj):
if hasattr(obj, '__json__'):
return obj.__json__()
elif isinstance(obj, Pattern):
return {
'origin': obj._origin
}
elif isinstance(obj, Command):
return {
'name': obj.name,
'pattern': obj.pattern,
'declaration': self.get_function_declaration(obj._runner),
'docstring': inspect.getdoc(obj._runner) or ''
}
elif isinstance(obj, CommandsManager):
return {
'name': obj.name,
'commands': obj.commands,
}
else:
return super().default(obj)
def get_function_declaration(self, func):
get_name = lambda x: x.__name__ if hasattr(x, '__name__') else x
# return inspect.getsourcelines(func)[0][:2]
signature = inspect.signature(func)
parameters = []
for name, parameter in signature.parameters.items():
# if issubclass(parameter.annotation, _SpecialGenericAlias): for typing.Generator and etc
# parameters.append(str(parameter))
# continue
param_str = name
if parameter.annotation != inspect.Parameter.empty:
annotation = get_name(parameter.annotation)
param_str += f': {annotation}'
if parameter.default != inspect.Parameter.empty:
default_value = parameter.default
if default_value is None:
default_str = 'None'
elif isinstance(default_value, str):
default_str = f"'{default_value}'"
else:
default_str = str(default_value)
param_str += f' = {default_str}'
parameters.append(param_str)
func_type = 'async def' if inspect.iscoroutinefunction(func) else 'def'
name = func.__name__
parameters_str = ', '.join(parameters)
return_annotation = get_name(signature.return_annotation)
if signature.return_annotation != inspect.Parameter.empty:
return f'{func_type} {name}({parameters_str}) -> {return_annotation}'
else:
return f'{func_type} {name}({parameters_str})'

View File

@ -80,4 +80,29 @@ def test_extend_manager():
assert child_manager.commands[0].name == 'Child.test'
assert root_manager.commands[0].name == 'CommandsManager.test'
assert root_manager.commands[1].name == 'Child.test'
def test_manager_get_command_by_name():
manager = CommandsManager('TestManager')
child = CommandsManager('Child')
@manager.new('')
def test(): ...
@manager.new('')
def test2(): ...
@manager.new('')
def test3(): ...
@manager.new('')
def test4(): ...
@child.new('')
def test5(): ...
manager.extend(child)
assert manager.get_by_name('test2') == test2
assert manager.get_by_name('TestManager.test3') == test3
assert manager.get_by_name('test5') == None
assert manager.get_by_name('Child.test5') == test5

View File

@ -0,0 +1,75 @@
from typing import AsyncGenerator, Type, Any, Callable
import pytest
from stark.core import (
CommandsManager,
Response,
ResponseHandler,
AsyncResponseHandler,
ResponseStatus
)
from stark.core.types import Word
from stark.general.json_encoder import StarkJsonEncoder
import json
async def test_command_json():
manager = CommandsManager('TestManager')
@manager.new('test pattern $word:Word')
def test(var: str, word: Word, foo: int | None = None) -> Response:
'''test command'''
return Response(text=var)
string = json.dumps(test, cls = StarkJsonEncoder)
parsed = json.loads(string)
assert parsed['name'] == 'TestManager.test'
assert parsed['pattern']['origin'] == r'test pattern $word:Word'
assert parsed['declaration'] == 'def test(var: str, word: Word, foo: int | None = None) -> Response'
assert parsed['docstring'] == 'test command'
async def test_async_command_complicate_type_json():
manager = CommandsManager('TestManager')
@manager.new('async test')
async def test2(
some: AsyncGenerator[
Callable[
[Any], Type
],
list[None]
]
):
return Response()
string = json.dumps(test2, cls = StarkJsonEncoder)
parsed = json.loads(string)
assert parsed['name'] == 'TestManager.test2'
assert parsed['pattern']['origin'] == r'async test'
assert parsed['declaration'] == 'async def test2(some: AsyncGenerator)' # TODO: improve AsyncGenerator to full type
# assert parsed['declaration'] == 'async def test2(some: AsyncGenerator[Callable[[Any], Type], list[None], None])'
assert parsed['docstring'] == ''
def test_manager_json():
manager = CommandsManager('TestManager')
@manager.new('')
def test(): ...
@manager.new('')
def test2(): ...
@manager.new('')
def test3(): ...
@manager.new('')
def test4(): ...
string = json.dumps(manager, cls = StarkJsonEncoder)
parsed = json.loads(string)
assert parsed['name'] == 'TestManager'
assert {c['name'] for c in parsed['commands']} == {'TestManager.test', 'TestManager.test2', 'TestManager.test3', 'TestManager.test4',}