1
0
mirror of https://github.com/MarkParker5/STARK.git synced 2025-07-02 22:36:54 +02:00

replace this with self in all classes

This commit is contained in:
MarkParker5
2021-08-31 23:09:39 +03:00
parent 9ddc85c765
commit 090df2950c
14 changed files with 140 additions and 140 deletions

View File

@ -3,21 +3,21 @@ from .Response import Response
import re
class Callback:
def __init__(this, patterns, quiet = False, once = True):
this.patterns = patterns
this.quiet = quiet
this.once = once
def __init__(self, patterns, quiet = False, once = True):
self.patterns = patterns
self.quiet = quiet
self.once = once
def setStart(this, function):
this.start = function
def setStart(self, function):
self.start = function
def start(this, params):
def start(self, params):
pass
def answer(this, string):
for pattern in this.patterns:
def answer(self, string):
for pattern in self.patterns:
if match := re.search(re.compile(Command.compilePattern(pattern)), string):
return this.start({**match.groupdict(), 'string':string})
return self.start({**match.groupdict(), 'string':string})
return None
@staticmethod

View File

@ -7,16 +7,16 @@
# must return dict like {'cmd': cmd, 'params': params}
# Command.reg_find() - recognize command from a string with regexe patterns, return command object
# must return dict like {'cmd': cmd, 'params': params}
# this - object (class instance) pointer (self)
# abstract this.start() - required method for all commands
# this.keywords - dictionary of arrays keywords
# self - object (class instance) pointer (self)
# abstract self.start() - required method for all commands
# self.keywords - dictionary of arrays keywords
# like {
# (int)weight : ['word1', 'word2', 'word3'],
# (int)weight1 : ['word3', 'word4'],
# (int)weight2 : ['word5', 'word6', 'word7', 'word8', 'word9'],
# }
# this.patterns - list of command patterns
# this.subpatterns - list of subpaterns (context patterns)
# self.patterns - list of command patterns
# self.subpatterns - list of subpaterns (context patterns)
# like ['* который * час *', '* скольк* * (врем|час)* *']
# Command._entities - linked patterns $Pattern
# Command.regex - regex patterns dict for better syntax
@ -70,24 +70,24 @@ class Command(ABC):
# one or more of the list, without order {a|b|c}
'\{((?:.*\|?)*?.*?)\}': r'(?:\1)+?',
}
def __init__(this, name, keywords = {}, patterns = [], subPatterns = []): # initialisation of new command
this._name = name
this._keywords = keywords
this._patterns = patterns
this._subPatterns = subPatterns
Command.append(this)
def __init__(self, name, keywords = {}, patterns = [], subPatterns = []): # initialisation of new command
self._name = name
self._keywords = keywords
self._patterns = patterns
self._subPatterns = subPatterns
Command.append(self)
def __str__(this):
str = f'{this.__class__.__name__}.{this.getName()}:\n'
for key, value in this._keywords.items():
def __str__(self):
str = f'{self.__class__.__name__}.{self.getName()}:\n'
for key, value in self._keywords.items():
str += f'\t{key}:\t{value}\n'
return str
######################################################################################
# CONTROL KEYWORDS #
######################################################################################
def getKeyword(this, string): # return position of the keyword
for weight, array in this._keywords.items():
def getKeyword(self, string): # return position of the keyword
for weight, array in self._keywords.items():
index = 0
for word in array:
if string == word:
@ -95,24 +95,24 @@ class Command(ABC):
index += 1
return None # if not found
def removeKeyword(this, string):
position = this.getKeyword(string)
if(position): del this._keywords[ position[0] ][ position[1] ]
def removeKeyword(self, string):
position = self.getKeyword(string)
if(position): del self._keywords[ position[0] ][ position[1] ]
def addKeyword(this, weight, string): # add new keywords to end of the list
if this.getKeyword(string): return
if( this._keywords.get(weight) ): this._keywords[weight].append(string)
else: this._keywords[weight] = [string]
def addKeyword(self, weight, string): # add new keywords to end of the list
if self.getKeyword(string): return
if( self._keywords.get(weight) ): self._keywords[weight].append(string)
else: self._keywords[weight] = [string]
def changeKeyword(this, weight, name): # set new weight to keyword (find by name)
this.removeKeyword(name)
this.addKeyword(weight, name)
def changeKeyword(self, weight, name): # set new weight to keyword (find by name)
self.removeKeyword(name)
self.addKeyword(weight, name)
def checkContext(this, string): # return cmd if the string matches the cmd context
for pattern in this.getSubPatterns():
def checkContext(self, string): # return cmd if the string matches the cmd context
for pattern in self.getSubPatterns():
if match := re.search(re.compile(Command.compilePattern(pattern)), string):
return {
'cmd': this,
'cmd': self,
'params': {**match.groupdict(), 'string':string}, # return parans+initial text
}
raise Exception("Not Found") # raise exception if context not found
@ -120,29 +120,29 @@ class Command(ABC):
######################################################################################
# SETTERS #
######################################################################################
def setStart(this, function): # define start (required)
this.start = function
def setStart(self, function): # define start (required)
self.start = function
######################################################################################
# GETTERS #
######################################################################################
def getName(this):
return this._name
def getName(self):
return self._name
def getKeywords(this):
return this._keywords
def getKeywords(self):
return self._keywords
def getPatterns(this):
return this._patterns
def getPatterns(self):
return self._patterns
def getSubPatterns(this):
return this._subPatterns
def getSubPatterns(self):
return self._subPatterns
######################################################################################
# ABSTRACT METHODS #
######################################################################################
@abstractmethod
def start(this, params): # main method
def start(self, params): # main method
pass
######################################################################################

View File

@ -1,6 +1,6 @@
class Response:
def __init__(this, voice, text, callback = None, thread = None):
this.voice: str = voice
this.text: str = text
this.callback: Callback = callback
this.thread = thread
def __init__(self, voice, text, callback = None, thread = None):
self.voice: str = voice
self.text: str = text
self.callback: Callback = callback
self.thread = thread

View File

@ -1,5 +1,5 @@
from ..Command import Command
class Media(Command):
def start(this, string): # main method
def start(self, string): # main method
pass

View File

@ -7,9 +7,9 @@ import json
import re
class QA(Command):
def confirm(this, string): return True
def confirm(self, string): return True
def googleDictionary(this, word):
def googleDictionary(self, word):
responce = requests.get(f'https://api.dictionaryapi.dev/api/v2/entries/ru/{word}')
if responce.status_code == 200:
responce = json.loads(responce.content)
@ -41,32 +41,32 @@ class QA(Command):
}
return {}
def wikipedia(this, word):
def wikipedia(self, word):
wiki.set_lang("ru")
article = wiki.summary(word, sentences=5)
try: return article[:article.find('\n\n')][:600]
except: return ''
def googleSearch(this, word):
def googleSearch(self, word):
responce = requests.get(f'https://www.google.ru/search?&q={word}&lr=lang_ru&lang=ru')
page = BS(responce.content, 'html.parser')
info = page.select('div.BNeawe>div>div.BNeawe')
return info[0].get_text() if info else ''
def start(this, params):
def start(self, params):
query = params['string']
if 'вики' in query:
query = query.replace('википедия', '').replace('вики', '').strip()
try: search = this.googleSearch(query)
try: search = self.googleSearch(query)
except: search = ''
try: wiki = this.wikipedia(query) if not 'Википедия' in search else ''
try: wiki = self.wikipedia(query) if not 'Википедия' in search else ''
except: wiki = ''
try: gdict = this.googleDictionary(query)
try: gdict = self.googleDictionary(query)
except: gdict = []
voice = search or (gdict['short'] if gdict else '') or wiki
text = (f'Google Search:\n\t{search}' if search else '') + (f'\n\nWikipedia:\n\t{wiki}' if wiki else '') + ('\n\nDictionary:'+gdict['text'] if gdict else '')
else:
try: search = this.googleSearch(query)
try: search = self.googleSearch(query)
except: search = ''
voice = text = search or random.choice(['Не совсем понимаю, о чём вы.', 'Вот эта последняя фраза мне не ясна.', 'А вот это не совсем понятно.', 'Можете сказать то же самое другими словами?', 'Вот сейчас я совсем вас не понимаю.', 'Попробуйте выразить свою мысль по-другому',])
return Response(text = text, voice = voice)

View File

@ -2,7 +2,7 @@ from ..Command import Command # import parent class
import os
class Raspi(Command):
def start(this, string): # main method
def start(self, string): # main method
pass
@staticmethod

View File

@ -6,5 +6,5 @@
from ..Command import Command # import parent class
class SmallTalk(Command):
def start(this, string): # main method
def start(self, string): # main method
print(f'Hello, {string=}')

View File

@ -33,7 +33,7 @@ class SmartHome(Command):
radio = radio
send_queue = []
def start(this, string): # main method
def start(self, string): # main method
pass
@staticmethod

View File

@ -10,7 +10,7 @@ class Zieit (Command):
lessonsEndTime = ['09:15','10:45','12:25','13:55','15:25','17:05','18:35']
weekdays = ["Неділя", "Понеділок", "Вівторок", "Середа", "Четвер", "П'ятниця", "Субота"]
def start(this, string): # main method
def start(self, string): # main method
pass
@classmethod

View File

@ -3,12 +3,12 @@ from sqlite3 import Error
import config
def sqlite(func):
def wrapper(this, *args):
def wrapper(self, *args):
result = None
try:
connect = sqlite3.connect(config.db_name)
cursor = connect.cursor()
result = func(this, cursor, *args)
result = func(self, cursor, *args)
connect.commit()
except Error as er:
print('SQLite error: %s' % (' '.join(er.args)))
@ -22,45 +22,45 @@ def sqlite(func):
class DataBase:
@sqlite
def __init__(this, cursor, table_name, columns = None):
this.table_name = table_name
if not cursor.execute(f'select * from sqlite_master WHERE type = "table" AND name = "{this.table_name}"').fetchall():
def __init__(self, cursor, table_name, columns = None):
self.table_name = table_name
if not cursor.execute(f'select * from sqlite_master WHERE type = "table" AND name = "{self.table_name}"').fetchall():
if columns:
cursor.execute(f'create table if not exists {this.table_name}(id integer PRIMARY KEY, {", ".join(columns)})')
this.columns = ['id',]+columns
cursor.execute(f'create table if not exists {self.table_name}(id integer PRIMARY KEY, {", ".join(columns)})')
self.columns = ['id',]+columns
else:
this.columns = [properties[1] for properties in cursor.execute(f'pragma table_info({this.table_name})').fetchall()]
self.columns = [properties[1] for properties in cursor.execute(f'pragma table_info({self.table_name})').fetchall()]
@sqlite
def all(this, cursor):
rows = cursor.execute(f'select * from {this.table_name}').fetchall()
def all(self, cursor):
rows = cursor.execute(f'select * from {self.table_name}').fetchall()
data = []
for row in rows:
dict = {}
for i, value in enumerate(row):
dict[this.columns[i]] = value
dict[self.columns[i]] = value
data.append(dict)
return data
@sqlite
def where(this, cursor, condition):
return cursor.execute(f'select * from {this.table_name} where {condition}')
def where(self, cursor, condition):
return cursor.execute(f'select * from {self.table_name} where {condition}')
@sqlite
def count(this, count):
return len(this.all())
def count(self, count):
return len(self.all())
@sqlite
def update(this, cursor, values, where):
def update(self, cursor, values, where):
updates = " ".join([f'{key} = "{value}"' for key, value in values.items()])
cursor.execute(f'update {this.table_name} set {updates} where {where}')
cursor.execute(f'update {self.table_name} set {updates} where {where}')
@sqlite
def insert(this, cursor, values):
def insert(self, cursor, values):
values = ['"'+v+'"' for v in values]
cursor.execute(f'insert into {this.table_name}({", ".join(this.columns[1:])}) values({", ".join(values)})')
cursor.execute(f'insert into {self.table_name}({", ".join(self.columns[1:])}) values({", ".join(values)})')
@sqlite
def drop(this, cursor):
cursor.execute(f'drop table if exists {this.table_name}')
def drop(self, cursor):
cursor.execute(f'drop table if exists {self.table_name}')

View File

@ -5,31 +5,31 @@ import config
#m = sr.Microphone(device_index=config.device_index)
class SpeechToText:
def __init__(this, device = config.device_index, language = config.language_code):
this.device = device
this.language = language
this.m = sr.Microphone(device_index = this.device)
this.r = sr.Recognizer()
this.r.pause_threshold = config.pause_threshold
this.r.energy_threshold = config.energy_threshold
this.r.dynamic_energy_threshold = config.dynamic_energy_threshold
this.r.non_speaking_duration = config.non_speaking_duration
def __init__(self, device = config.device_index, language = config.language_code):
self.device = device
self.language = language
self.m = sr.Microphone(device_index = self.device)
self.r = sr.Recognizer()
self.r.pause_threshold = config.pause_threshold
self.r.energy_threshold = config.energy_threshold
self.r.dynamic_energy_threshold = config.dynamic_energy_threshold
self.r.non_speaking_duration = config.non_speaking_duration
def listen(this):
def listen(self):
try:
with this.m as source:
audio = this.r.listen(source)
with self.m as source:
audio = self.r.listen(source)
except:
return ''
try:
responce = {'text': this.r.recognize_google(audio, language = this.language).lower(), 'status': 'ok'}
responce = {'text': self.r.recognize_google(audio, language = self.language).lower(), 'status': 'ok'}
except sr.UnknownValueError:
responce = {'text': None, 'status': 'void'}
except sr.RequestError:
responce = {'text': None, 'status': 'error'}
return responce
def recognize(this, speech):
def recognize(self, speech):
with sr.AudioFile(speech.getPath()) as source:
audio = r.record(source)
try:
@ -37,10 +37,10 @@ class SpeechToText:
except:
return ''
def listen_noise(this):
with this.m as source:
this.r.adjust_for_ambient_noise(source)
def listen_noise(self):
with self.m as source:
self.r.adjust_for_ambient_noise(source)
def set_device(this, index):
this.device = 1
this.m = sr.Microphone(device_index = this.device)
def set_device(self, index):
self.device = 1
self.m = sr.Microphone(device_index = self.device)

View File

@ -7,16 +7,16 @@ import config
class Speech:
_list = []
def __init__(this, text, voice, path, standart = False):
this._text = text
this._voice = voice
this._path = path
this._standart = standart
if(standart): Speech.append(this)
def __init__(self, text, voice, path, standart = False):
self._text = text
self._voice = voice
self._path = path
self._standart = standart
if(standart): Speech.append(self)
def speak(this):
if( os.path.exists(this._path) ):
with open(this._path) as f:
def speak(self):
if( os.path.exists(self._path) ):
with open(self._path) as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as audio:
mixer.init()
mixer.music.load(audio)
@ -24,13 +24,13 @@ class Speech:
mixer.music.play()
while mixer.music.get_busy():
sleep(0.1)
if(not this._standart): os.remove(this._path)
if(not self._standart): os.remove(self._path)
def getBytes(this):
return open(this._path, 'rb')
def getBytes(self):
return open(self._path, 'rb')
def getPath(this):
return this._path
def getPath(self):
return self._path
@staticmethod
def append(obj):
@ -41,15 +41,15 @@ class Speech:
return Speech._list
class Engine:
def __init__(this, name = 'ru-RU-Wavenet-B', language_code = config.language_code):
def __init__(self, name = 'ru-RU-Wavenet-B', language_code = config.language_code):
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = config.goole_tts_json_key
this._client = texttospeech.TextToSpeechClient()
this._audio_config = texttospeech.AudioConfig( audio_encoding = texttospeech.AudioEncoding.MP3 )
this._language_code= language_code
this._name = name
this._voice = texttospeech.VoiceSelectionParams(
language_code = this._language_code,
name = this._name,
self._client = texttospeech.TextToSpeechClient()
self._audio_config = texttospeech.AudioConfig( audio_encoding = texttospeech.AudioEncoding.MP3 )
self._language_code= language_code
self._name = name
self._voice = texttospeech.VoiceSelectionParams(
language_code = self._language_code,
name = self._name,
ssml_gender = texttospeech.SsmlVoiceGender.FEMALE)
@staticmethod
@ -67,16 +67,16 @@ class Engine:
else: name = name.replace(letter, '_')
return name
def generate(this, text, standart = False):
dir = f'audio/{this._name}'
def generate(self, text, standart = False):
dir = f'audio/{self._name}'
path = f'{dir}/{Engine.transliterate(text)[:100]}.mp3'
if( os.path.exists(path) ): return Speech(text, this._name, path, standart)
if( os.path.exists(path) ): return Speech(text, self._name, path, standart)
synthesis_input = texttospeech.SynthesisInput(text=text)
try:
response = this._client.synthesize_speech(input = synthesis_input, voice = this._voice, audio_config = this._audio_config)
response = self._client.synthesize_speech(input = synthesis_input, voice = self._voice, audio_config = self._audio_config)
if not os.path.exists(dir): os.makedirs(dir)
with open(path, 'wb') as out:
out.write(response.audio_content)
except:
print("[ERROR] TTS Error: google cloud tts response error. Check Cloud Platform Console")
return Speech(text, this._name, path, standart)
return Speech(text, self._name, path, standart)

View File

@ -5,7 +5,7 @@ def getClass(name):
from Command import Command # import parent class
class {name} (Command):
def start(this, string): # main method
def start(self, string): # main method
pass
'''
return str.strip()

View File

@ -9,5 +9,5 @@ import Features
# start oll controls in own thread or subprocess:
# voice assistant, telegram bot, django(api and ui)
#Controls.TelegramBot().start()
Controls.VoiceAssistant().start()
Controls.TelegramBot().start()
#Controls.VoiceAssistant().start()