1
0
mirror of https://github.com/MarkParker5/STARK.git synced 2025-02-12 11:46:14 +02:00

new regex in progress

This commit is contained in:
MarkParker5 2023-01-30 08:16:43 +01:00
parent c089d1dd29
commit 64307b98b7
No known key found for this signature in database
GPG Key ID: 0632FDCE0F9ED5C7
48 changed files with 271 additions and 315 deletions

View File

@ -1,5 +1,5 @@
[Unit]
Description=Smarthome Hub
Description=MajorDom VA
After=network.target
StartLimitIntervalSec=0
@ -7,10 +7,10 @@ StartLimitIntervalSec=0
Type=simple
Restart=always
RestartSec=1
User=mark
WorkingDirectory=/home/mark/ArchieHub/SmartHome
User=admin
WorkingDirectory=/home/admin/MajorDom-VA/
ExecStart=python3.10 main.py
StandardOutput=append:/home/mark/logs/smarthome.log
StandardOutput=append:/home/admin/logs/majordom-va.log
Environment=PYTHONUNBUFFERED=1
[Install]

View File

@ -1,67 +0,0 @@
from typing import Callable, Optional
from time import sleep
from VICore.VIObjects import VITime, VITimeInterval
from .. import Singleton, UUID, threadingFunction
from .DispatchQueueItem import DispatchQueueItem
class DispatchQueue(Singleton):
_queue: [DispatchQueueItem] = []
_nearestItemTime: Optional[VITime] = None
def asyncAt(self, time: VITime, execute: Callable, *args, **kwargs) -> UUID:
item = DispatchQueueItem(execute = execute, time = time, args = args, kwargs = kwargs)
self.insert(item)
return item.id
def asyncAfter(self, timeinterval: VITimeInterval, execute: Callable, *args, **kwargs) -> UUID:
time = VITime().addingInterval(timeinterval)
item = DispatchQueueItem(execute = execute, time = time, timeinterval = timeinterval,
args = args, kwargs = kwargs)
self.insert(item)
return item.id
def repeatEvery(self, timeinterval: VITimeInterval, execute: Callable, *args, **kwargs) -> UUID:
time = VITime().addingInterval(timeinterval)
item = DispatchQueueItem(execute = execute, time = time, timeinterval = timeinterval,
repeat = True, args = args, kwargs = kwargs)
self.insert(item)
return item.id
def insert(self, item: DispatchQueueItem):
now = VITime()
low = 0
high = len(self._queue)
while low < high:
mid = (low + high) // 1
if self._queue[mid].time < now: low = mid + 1
else: high = mid
self._queue.insert(low, item)
self._nearestItemTime = self._queue[0].time
def invalidate(self, id: UUID) -> DispatchQueueItem:
for i, item in enumerate(self._queue):
if item.id != id: continue
if i == 0: self._nearestItemTime = self._queue[1].time if len(self._queue) > 1 else None
return self._queue.pop(i)
@threadingFunction
def loop(self):
while True:
if not self._queue or not self._nearestItemTime or self._nearestItemTime > VITime():
sleep(0.1)
continue
item = self._queue.pop(0)
item.execute()
if item.repeat:
item.time.addInterval(item.timeinterval)
self.insert(item)
self._nearestItemTime = self._queue[0].time if self._queue else None
DispatchQueue().loop()

View File

@ -1,24 +0,0 @@
from typing import Callable, Any
from VICore import VITime, VITimeInterval
from .. import Identifable, threadingFunction
class DispatchQueueItem(Identifable):
time: VITime
timeinterval: VITimeInterval
worker: Callable
args: list[Any]
kwargs: dict[Any, Any]
repeat: bool
def __init__(self, execute: Callable, time: VITime = VITime(), timeinterval: VITimeInterval = VITimeInterval(0),
repeat: bool = False, args: list[Any] = [], kwargs: dict[Any, Any] = []):
self.time = time
self.timeinterval = timeinterval
self.worker = execute
self.args = args
self.kwargs = kwargs
self.repeat = repeat
@threadingFunction
def execute(self):
self.worker(*self.args, **self.kwargs)

View File

@ -1,2 +0,0 @@
from .DispatchQueue import DispatchQueue
from .DispatchQueueItem import DispatchQueueItem

View File

@ -1,11 +0,0 @@
from __future__ import annotations
from .UUID import UUID
class Identifable():
id: UUID = UUID()
def __eq__(self, other: Identifable) -> bool:
return self.id == other.id
def __ne__(self, other: Identifable) -> bool:
return self.id != other.id

View File

@ -1,13 +0,0 @@
from typing import Optional
import uuid
class UUID(uuid.UUID):
def __new__(cls, string: Optional[str] = None):
if string:
return super().__new__(cls)
else:
return uuid.uuid1()
def __str__(self) -> str:
return self.hex

View File

@ -1,2 +0,0 @@
from .Identifable import Identifable
from .UUID import UUID

View File

@ -1,12 +0,0 @@
from typing import Any, Optional
from .NotificationName import NotificationName
class Notification:
name: NotificationName
object: Any
data: dict
def __init__(self, name: NotificationName, object: Optional[Any], data: dict):
self.name = name
self.object = object
self.data = data

View File

@ -1,25 +0,0 @@
from typing import Any, Optional
from .. import Singleton, UUID
from . import Notification, NotificationName, NotificationObserver, NotificationCallback
class NotificationCenter(Singleton):
_observers: dict[NotificationName, list[NotificationObserver]] = {}
def post(self, name: NotificationName, object: Optional[Any] = None, data: dict = []):
notify = Notification(name = name, object = object, data = data)
for obser in self._observers.get(name) or []:
obser.callback(notify)
def addObserver(self, notificationName: NotificationName, callback: NotificationCallback) -> NotificationObserver:
if not notificationName in self._observers.keys() :
self._observers[notificationName] = []
observer = NotificationObserver(notificationName, callback)
self._observers[notificationName].append(observer)
return observer
def removeObserver(self, observer: NotificationObserver):
if observer in self._observers.get(observer.notificationName) or []:
self._observers.get(observer.notificationName).remove(observer)

View File

@ -1,4 +0,0 @@
from enum import Enum, auto
class NotificationName(Enum):
test = auto()

View File

@ -1,15 +0,0 @@
from typing import Callable
from .. import Identifable, threadingFunction
from . import Notification, NotificationName
NotificationCallback = Callable[[Notification,], None]
class NotificationObserver(Identifable):
notificationName: NotificationName
callback: NotificationCallback
def __init__(self, notificationName: NotificationName, callback: NotificationCallback):
super().__init__()
self.notificationName = notificationName
self.callback = threadingFunction(callback)

View File

@ -1,4 +0,0 @@
from .Notification import Notification
from .NotificationName import NotificationName
from .NotificationObserver import NotificationObserver, NotificationCallback # noqa
from .NotificationCenter import NotificationCenter

View File

@ -1,9 +0,0 @@
from abc import ABC
class Singleton(ABC):
# Singleton
def __new__(cls):
if not hasattr(cls, 'instance'):
cls.instance = super().__new__(cls)
return cls.instance

View File

@ -1,8 +0,0 @@
from .Singleton import *
from .Identifable import *
from .ThreadingFunction import threadingFunction
#from .StringProcesser import *
from .DispatchQueue import *
from .Notifications import *
from .SpeechRecognition import *
from .Text2Speech import *

View File

@ -1,38 +0,0 @@
from typing import Type, Optional
import re
from .expressions import expressions
from ..VIObjects import *
class Pattern:
origin: str
compiled: str #getonly
def __init__(self, origin: str):
self.origin = origin
@property
def compiled(self) -> str: # transform pattern to classic regex with named groups
pattern: str = self.origin
# transform patterns to regexp
for ptrn, regex in expressions.items():
pattern = re.sub(re.compile(ptrn), regex, pattern)
# find and transform arguments like $name:Type
argumentRegex = re.compile(r'\$[:word:]:[:word:]')
reMatch = re.search(argumentRegex, pattern)
while reMatch:
match = reMatch.pop(0)
arg: str = match[1:]
argName, argTypeName = arg.split(':')
argType: Type[VIObject] = classFromString(argTypeName)
pattern = re.sub('\\'+link[0], f'(?P<{arg}>{argType.pattern.compiled})', pattern)
return re.compile(pattern)
def match(self, string: VIString) -> Optional[dict[str, str]]:
if match := re.search(self.compiled, string.value):
return match.groupdict()
return None

View File

@ -1,2 +0,0 @@
from .Pattern import Pattern
from ..VIObjects import *

View File

@ -1,13 +0,0 @@
expressions = {
# stars *
r'([A-Za-zА-ЯЁа-яё0-9\(\)\[\]\{\}]+)\*([A-Za-zА-ЯЁа-яё0-9\(\)\[\]\{\}]+)': r'\\b\1.*\2\\b', # 'te*xt'
r'\*([A-Za-zА-ЯЁа-яё0-9\(\)\[\]\{\}]+)': r'\\b.*\1', # '*text'
r'([A-Za-zА-ЯЁа-яё0-9\(\)\[\]\{\}]+)\*': r'\1.*\\b', # 'text*'
r'(^|\s)\*($|\s)': r'.*', # '*' ' * '
# one of the list (a|b|c)
r'\(((?:.*\|)*.*)\)': r'(?:\1)',
# 0 or 1 the of list [abc]
r'\[((?:.*\|?)*?.*?)\]': r'(?:\1)??',
# one or more of the list, without order {a|b|c}
r'\{((?:.*\|?)*?.*?)\}': r'(?:\1)+?',
}

View File

@ -1,4 +0,0 @@
synonyms = {
'да': ['да', 'конечно', 'ествественно'],
'нет': ['нет', 'не надо', 'отмена', 'отставить'],
}

View File

@ -1,20 +1,18 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from abc import ABC
from typing import Any
from copy import copy
from ..Pattern import Pattern
from .. import Pattern
class classproperty(property):
def __get__(self, cls, owner):
return classmethod(self.fget).__get__(None, owner)()
class VIObject(ABC):
pattern: Pattern # static getonly
stringValue: str
value: Any
formatted: str
def __init__(self, value: Any):
self.value = value
@ -22,17 +20,21 @@ class VIObject(ABC):
@classmethod
def parse(cls, fromString: str) -> VIObject:
object = cls()
object.stringValue = fromString
return cls()
object.value = fromString
return object
@classproperty
def pattern(cls) -> Pattern:
return Pattern('*')
@property
def formatted(self) -> Pattern:
return self.value
def __repr__(self):
strValue = f'"{str(self.value)}"' if type(self.value) == str else str(self.value)
return f'<{type(self).__name__} value:{strValue}>'
@classproperty
def pattern() -> Pattern:
return Pattern('*')
def __lt__(self, other: VIObject) -> bool:
return self.value < other.value

View File

@ -1,17 +1,4 @@
from .VIObject import VIObject, Pattern, classproperty
from .VIObject import VIObject
class VIString(VIObject):
value: str
def __init__(self, value: str):
self.value = value
@classmethod
def parse(cls, fromString: str):
acString = cls(value = string)
acString.stringValue = string
return acString
@classproperty
def pattern() -> Pattern:
return Pattern('*')
value: str

View File

@ -1,3 +1,20 @@
from .Pattern import *
from .Commands import *
from .VIObjects import *
from .patterns import Pattern, expressions
from .VIObjects import (
VIString,
VIWord,
VINumber,
VITime,
VITimeInterval
)
# from .Commands import (
# )
Pattern.argumentTypes = {
'VIString': VIString,
'VIWord': VIWord,
'VINumber': VINumber,
'VITime': VITime,
'VITimeInterval': VITimeInterval,
}

View File

@ -0,0 +1,65 @@
from typing import Type
from dataclasses import dataclass
import re
from .expressions import dictionary
# from ..VIObjects import VIObject
@dataclass
class MatchResult:
substring: str
groups: dict[str, str]
class Pattern:
argumentTypes: dict[str, Type['VIObject']] = {} # static
_origin: str
_compiled: str | None = None
def __init__(self, origin: str):
self._origin = origin
@property
def compiled(self) -> str: # transform Pattern to classic regex with named groups
if self._compiled: return self._compiled
pattern: str = self._origin
# transform vicore expressions to regex
for ptrn, regex in dictionary.items():
if res := re.search(ptrn, pattern):
pattern = re.sub(ptrn, regex, pattern)
# find and transform arguments like $name:Type
types = '|'.join(Pattern.argumentTypes.keys())
argumentRegex = re.compile(r'\$(?P<name>[A-z]+)\:(?P<type>(?:' + types + r'))')
argumentMatches = re.finditer(argumentRegex, pattern)
for match in argumentMatches:
argName = match.group('name')
argTypeName = match.group('type')
argType: Type['VIObject'] = Pattern.argumentTypes.get(argTypeName)
if not argType:
raise ValueError(f'Unknown type: {argTypeName} for argument: {argName} in pattern: {self._origin}')
pattern = re.sub('\\' + match.group(0), f'(?P<{argName}>{argType.pattern.compiled})', pattern)
# save and return
self._compiled = pattern
print(self._compiled)
return self._compiled
def match(self, string: str) -> MatchResult | None:
if matches := sorted(re.finditer(self.compiled, string), key = lambda m: len(m.group(0))):
match = matches[-1]
return MatchResult(match.group(0).strip(), match.groupdict())
return None

View File

@ -0,0 +1,2 @@
from . import expressions
from .Pattern import Pattern

View File

@ -0,0 +1,15 @@
alphanumerics = r'A-zА-яЁё0-9'
specials = r'\(\)\[\]\{\}'
any = alphanumerics + specials
dictionary = {
# one of the list (a|b|c)
r'\(((?:.*\|)*.*)\)': r'(?:\1)',
# one or more of the list, space-splitted {a|b|c}
r'\{((?:.*\|?)*?.*?)\}': r'(?:(?:\1)\\s?)+',
# stars *
r'\*\*': fr'[{alphanumerics}\\s]*', # ** for any few words
fr'([^{specials}]|^)\*': fr'\1[{alphanumerics}]*', # * for any word
}

View File

@ -0,0 +1 @@
from .ThreadingFunction import threadingFunction

26
pyproject.toml Normal file
View File

@ -0,0 +1,26 @@
[tool.poetry]
name = "majordom-va"
version = "0.1.0"
description = ""
authors = ["MarkParker5 <markparker.it@gmail.com>"]
readme = "README.md"
packages = [{include = "majordom_va"}]
[tool.poetry.dependencies]
python = "^3.10"
aiohttp = "^3.8.3"
sounddevice = "^0.4.5"
soundfile = "^0.11.0"
numpy = "^1.24.1"
[tool.poetry.group.dev.dependencies]
pytest = "^7.2.1"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
pythonpath = [
"majordom_va",
]

View File

@ -1,28 +0,0 @@
# Python 3.10+
# download model from https://alphacephei.com/vosk/models
# sudo apt-get install libssl-dev
# General
requests
aiohttp
# Rpi
RPi.GPIO
spidev
# voice assistant
sounddevice
soundfile
numpy
# STT
https://github.com/alphacep/vosk-api/releases/download/v0.3.42/vosk-0.3.42-py3-none-linux_aarch64.whl # vosk
# TTS
google-cloud-texttospeech
# QA
bs4
wikipedia

0
tests/conftest.py Normal file
View File

122
tests/test_patterns.py Normal file
View File

@ -0,0 +1,122 @@
import re
from VICore import Pattern
from VICore.patterns import expressions
word = f'[{expressions.alphanumerics}]*'
words = f'[{expressions.alphanumerics}\s]*'
def test_leading_star():
p = Pattern('*text')
assert p.compiled == fr'{word}text'
assert p.match('text')
assert p.match('aaatext')
assert p.match('bbb aaaatext cccc').substring == 'aaaatext'
assert not p.match('aaaaext')
p = Pattern('Some *text here')
assert p.compiled == fr'Some {word}text here'
assert p.match('Some text here')
assert p.match('Some aaatext here')
assert p.match('bbb Some aaatext here cccc').substring == 'Some aaatext here'
assert not p.match('aaatext here')
def test_trailing_star():
p = Pattern('text*')
assert p.compiled == fr'text{word}'
assert p.match('text')
assert p.match('textaaa')
assert p.match('bbb textaaa cccc').substring == 'textaaa'
p = Pattern('Some text* here')
assert p.compiled == fr'Some text{word} here'
assert p.match('Some text here')
assert p.match('Some textaaa here')
assert p.match('bbb Some textaaa here cccc').substring == 'Some textaaa here'
assert not p.match('Some textaaa ')
def test_middle_star():
p = Pattern('te*xt')
assert p.compiled == fr'te{word}xt'
assert p.match('text')
assert p.match('teaaaaaxt')
assert p.match('bbb teaaaaaxt cccc').substring == 'teaaaaaxt'
p = Pattern('Some te*xt here')
assert p.compiled == fr'Some te{word}xt here'
assert p.match('Some text here')
assert p.match('Some teaaaaaxt here')
assert p.match('bbb Some teaaeaaaxt here cccc').substring == 'Some teaaeaaaxt here'
assert not p.match('Some teaaaaaxt')
def test_double_star():
p = Pattern('**')
assert p.compiled == fr'{words}'
assert p.match('bbb teaaaaaxt cccc').substring == 'bbb teaaaaaxt cccc'
p = Pattern('Some ** here')
assert p.compiled == fr'Some {words} here'
assert p.match('Some text here')
assert p.match('Some lorem ipsum dolor here')
assert p.match('bbb Some lorem ipsum dolor here cccc').substring == 'Some lorem ipsum dolor here'
def test_one_of():
p = Pattern('(foo|bar)')
assert p.compiled == r'(?:foo|bar)'
assert p.match('foo')
assert p.match('bar')
assert p.match('bbb foo cccc').substring == 'foo'
assert p.match('bbb bar cccc').substring == 'bar'
p = Pattern('Some (foo|bar) here')
assert p.compiled == r'Some (?:foo|bar) here'
assert p.match('Some foo here')
assert p.match('Some bar here')
assert p.match('bbb Some foo here cccc').substring == 'Some foo here'
assert p.match('bbb Some bar here cccc').substring == 'Some bar here'
assert not p.match('Some foo')
def test_optional_one_of():
p = Pattern('(foo|bar)?')
assert p.compiled == r'(?:foo|bar)?'
assert p.match('foo')
assert p.match('bar')
assert p.match('')
assert p.match('bbb foo cccc').substring == 'foo'
assert p.match('bbb bar cccc').substring == 'bar'
assert p.match('bbb cccc').substring == ''
p = Pattern('Some (foo|bar)? here')
assert p.compiled == r'Some (?:foo|bar)? here'
assert p.match('Some foo here')
assert p.match('Some bar here')
assert p.match('Some here')
assert p.match('bbb Some foo here cccc').substring == 'Some foo here'
assert p.match('bbb Some bar here cccc').substring == 'Some bar here'
assert p.match('bbb Some here cccc').substring == 'Some here'
# assert Pattern('[foo|bar]').compiled == Pattern('(foo|bar)?').compiled
def test_one_or_more_of():
p = Pattern('{foo|bar}')
assert p.compiled == r'(?:(?:foo|bar)\s?)+'
assert p.match('foo')
assert p.match('bar')
assert not p.match('')
assert p.match('bbb foo cccc').substring == 'foo'
assert p.match('bbb bar cccc').substring == 'bar'
assert p.match('bbb foo bar cccc').substring == 'foo bar'
assert not p.match('bbb cccc')
p = Pattern('Some {foo|bar} here')
assert p.compiled == r'Some (?:(?:foo|bar)\s?)+ here'
assert p.match('Some foo here')
assert p.match('Some bar here')
assert not p.match('Some here')
assert p.match('bbb Some foo here cccc').substring == 'Some foo here'
assert p.match('bbb Some bar here cccc').substring == 'Some bar here'
assert p.match('bbb Some foo bar here cccc').substring == 'Some foo bar here'
assert not p.match('Some foo')
def test_typed_arguments():
p = Pattern('$name:VIString')
assert p.compiled == fr'(?P<name>{word})'