1
0
mirror of https://github.com/MarkParker5/STARK.git synced 2024-11-24 08:12:13 +02:00

test create run sync/async command

This commit is contained in:
Mark Parker 2023-09-16 06:04:00 +02:00
parent e49029aef1
commit 961e6aa1e0
No known key found for this signature in database
GPG Key ID: C10A60786A07A300
6 changed files with 257 additions and 212 deletions

View File

@ -112,12 +112,13 @@ async def commands_context_flow_filled(commands_context_flow):
@pytest.fixture
async def voice_assistant(commands_context_flow_filled):
context, _ = commands_context_flow_filled
voice_assistant = VoiceAssistant(
speech_recognizer = SpeechRecognizerMock(),
speech_synthesizer = SpeechSynthesizerMock(),
commands_context = context
)
voice_assistant.start()
yield voice_assistant
@contextlib.asynccontextmanager
async def _voice_assistant() -> AsyncGenerator[VoiceAssistant, None]:
async with commands_context_flow_filled() as (context, context_delegate):
voice_assistant = VoiceAssistant(
speech_recognizer = SpeechRecognizerMock(),
speech_synthesizer = SpeechSynthesizerMock(),
commands_context = context
)
yield voice_assistant
return _voice_assistant

View File

@ -2,8 +2,16 @@ import pytest
from core import CommandsManager, Response
@pytest.mark.asyncio
async def test_call_async_command_from_command():
async def test_create_run_async_command():
manager = CommandsManager()
@manager.new('foo')
async def foo() -> Response:
return Response(text = 'foo!')
assert (await foo()).text == 'foo!'
async def test_call_async_command_from_async_command():
manager = CommandsManager()
@manager.new('foo')

View File

@ -1,40 +0,0 @@
import pytest
pytestmark = pytest.mark.skip(reason = 'Background mode is deprecated')
def test_background_command(commands_context_flow_filled):
context, context_delegate = commands_context_flow_filled
context.process_string('background min')
assert len(context_delegate.responses) == 1
assert context_delegate.responses.pop(0).text == 'Starting background task'
assert len(context._threads) == 1
context._check_threads()
assert len(context_delegate.responses) == 1
assert context_delegate.responses.pop(0).text == 'Finished background task'
def test_background_with_multiple_responses(commands_context_flow_filled):
context, context_delegate = commands_context_flow_filled
context.process_string('background multiple responses')
assert len(context_delegate.responses) == 1
assert context_delegate.responses.pop(0).text == 'Starting long background task'
assert len(context._threads) == 1
responses = ['First response', 'Second response']
while context._threads[0].thread.is_alive():
if context_delegate.responses:
assert len(context_delegate.responses) == 1
assert context_delegate.responses.pop(0).text == responses.pop(0)
assert len(context_delegate.responses) == 0
assert len(responses) == 0
context._check_threads()
assert len(context_delegate.responses) == 1
assert context_delegate.responses.pop(0).text == 'Finished long background task'

View File

@ -0,0 +1,26 @@
import pytest
import anyio
@pytest.mark.skip(reason='Test is not implemented')
async def test_command_return_respond():
raise NotImplementedError()
@pytest.mark.skip(reason='Test is not implemented')
async def test_sync_command_call_sync_respond():
raise NotImplementedError()
@pytest.mark.skip(reason='Test is not implemented')
async def test_async_command_call_sync_respond():
raise NotImplementedError()
@pytest.mark.skip(reason='Test is not implemented')
async def test_sync_command_call_async_respond():
raise NotImplementedError()
@pytest.mark.skip(reason='Test is not implemented')
async def test_async_command_call_async_respond():
raise NotImplementedError()
@pytest.mark.skip(reason='Test is not implemented')
async def test_command_multiple_respond():
raise NotImplementedError()

View File

@ -0,0 +1,60 @@
import pytest
from asyncer import syncify
from core import CommandsManager, Response
async def test_create_await_run_sync_command():
manager = CommandsManager()
@manager.new('foo')
def foo() -> Response:
return Response(text = 'foo!')
# commands are always async, no matter runner function is sync or async
# so we need to await or syncify them
assert (await foo()).text == 'foo!'
def test_create_syncify_run_sync_command():
manager = CommandsManager()
@manager.new('foo')
def foo() -> Response:
return Response(text = 'foo!')
# commands are always async, no matter runner function is sync or async
# so we need to await or syncify them
# raise_sync_error needs for tests only because test function doesn't run in worker thread
sync_foo = syncify(foo, raise_sync_error = False)
assert sync_foo().text == 'foo!'
async def test_call_sync_command_from_sync_command_await():
manager = CommandsManager()
@manager.new('foo')
def foo() -> Response:
return Response(text = 'foo!')
@manager.new('bar')
def bar() -> Response:
# commands are always async, so we need to syncify async foo (or delcare current function as async and await foo, check /choice-runner-type)
sync_foo = syncify(foo)
return sync_foo()
assert (await bar()).text == 'foo!'
def test_call_sync_command_from_sync_command_syncify():
manager = CommandsManager()
@manager.new('foo')
def foo() -> Response:
return Response(text = 'foo!')
@manager.new('bar')
def bar() -> Response:
# commands are always async, so we need to syncify async foo (or delcare current function as async and await foo, check /choice-runner-type)
sync_foo = syncify(foo)
return sync_foo()
# raise_sync_error needs for tests only because test function doesn't run in worker thread
sync_bar = syncify(bar, raise_sync_error = False)
assert sync_bar().text == 'foo!'

View File

@ -1,171 +1,161 @@
import pytest
import anyio
from datetime import timedelta
from voice_assistant import Mode
pytestmark = pytest.mark.skip(reason = 'Background mode is deprecated. TODO: test VA modes with new concurrency approach')
pytestmark = pytest.mark.skip(reason = 'Test is not implemented yet')
def test_background_command(voice_assistant):
voice_assistant.speech_recognizer_did_receive_final_result('background min')
# start background task
assert len(voice_assistant.speech_synthesizer.results) == 1
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Starting background task'
assert len(voice_assistant.commands_context._threads) == 1
# check finished background task
voice_assistant.commands_context._check_threads()
assert len(voice_assistant.speech_synthesizer.results) == 1
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Finished background task'
# check saved response
assert len(voice_assistant._responses) == 0
async def test_background_command_with_waiting_mode(voice_assistant, autojump_clock):
async with voice_assistant() as voice_assistant:
voice_assistant.speech_recognizer_did_receive_final_result('background min')
# start background task
assert len(voice_assistant.speech_synthesizer.results) == 1
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Starting background task'
# assert len(voice_assistant.commands_context._threads) == 1
# force a timeout
voice_assistant._last_interaction_time -= timedelta(seconds = voice_assistant.mode.timeout_after_interaction + 1)
# check finished background task
# voice_assistant.commands_context._check_threads()
assert len(voice_assistant.speech_synthesizer.results) == 1
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Finished background task'
# check saved response
assert len(voice_assistant._responses) == 1
# emulate delay after last response before repeating
voice_assistant._responses[0].time -= timedelta(seconds = voice_assistant.mode.timeout_before_repeat + 1)
# interact to reset timeout mode and repeat saved response
voice_assistant.speech_recognizer_did_receive_final_result('hello world')
assert len(voice_assistant.speech_synthesizer.results) == 2
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Hello, world!'
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Finished background task'
def test_background_command_with_waiting_mode(voice_assistant):
voice_assistant.speech_recognizer_did_receive_final_result('background min')
async def test_background_command_with_inactive_mode(voice_assistant, autojump_clock):
async with voice_assistant() as voice_assistant:
voice_assistant.speech_recognizer_did_receive_final_result('background min')
# start background task
assert len(voice_assistant.speech_synthesizer.results) == 1
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Starting background task'
# assert len(voice_assistant.commands_context._threads) == 1
# set inactive mode
voice_assistant.mode = Mode.inactive
# check finished background task
# voice_assistant.commands_context._check_threads()
assert len(voice_assistant.speech_synthesizer.results) == 0
# check saved response
assert len(voice_assistant._responses) == 1
# interact to reset timeout mode and repeat saved response
voice_assistant.speech_recognizer_did_receive_final_result('hello world')
assert len(voice_assistant.speech_synthesizer.results) == 2
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Hello, world!'
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Finished background task'
# start background task
assert len(voice_assistant.speech_synthesizer.results) == 1
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Starting background task'
assert len(voice_assistant.commands_context._threads) == 1
# force a timeout
voice_assistant._last_interaction_time -= timedelta(seconds = voice_assistant.mode.timeout_after_interaction + 1)
# check finished background task
voice_assistant.commands_context._check_threads()
assert len(voice_assistant.speech_synthesizer.results) == 1
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Finished background task'
# check saved response
assert len(voice_assistant._responses) == 1
# emulate delay after last response before repeating
voice_assistant._responses[0].time -= timedelta(seconds = voice_assistant.mode.timeout_before_repeat + 1)
# interact to reset timeout mode and repeat saved response
voice_assistant.speech_recognizer_did_receive_final_result('hello world')
assert len(voice_assistant.speech_synthesizer.results) == 2
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Hello, world!'
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Finished background task'
def test_background_command_with_inactive_mode(voice_assistant):
voice_assistant.speech_recognizer_did_receive_final_result('background min')
# start background task
assert len(voice_assistant.speech_synthesizer.results) == 1
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Starting background task'
assert len(voice_assistant.commands_context._threads) == 1
# set inactive mode
voice_assistant.mode = Mode.inactive
# check finished background task
voice_assistant.commands_context._check_threads()
assert len(voice_assistant.speech_synthesizer.results) == 0
# check saved response
assert len(voice_assistant._responses) == 1
# interact to reset timeout mode and repeat saved response
voice_assistant.speech_recognizer_did_receive_final_result('hello world')
assert len(voice_assistant.speech_synthesizer.results) == 2
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Hello, world!'
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Finished background task'
def test_background_waiting_needs_input(voice_assistant):
voice_assistant.speech_recognizer_did_receive_final_result('background needs input')
assert len(voice_assistant.commands_context._threads) == 1
# force a timeout
voice_assistant._last_interaction_time -= timedelta(seconds = voice_assistant.mode.timeout_after_interaction + 1)
# wait for thread to finish
voice_assistant.commands_context._threads[0].thread.join()
# receive context output
voice_assistant.commands_context._check_threads()
# voice assistant should save all responses for later
assert len(voice_assistant._responses) == 8
assert len(voice_assistant.speech_synthesizer.results) == 8
voice_assistant.speech_synthesizer.results.clear()
# emulate delay after last response before repeating
for response in voice_assistant._responses:
response.time -= timedelta(seconds = voice_assistant.mode.timeout_before_repeat + 1)
# interact to reset timeout mode
voice_assistant.speech_recognizer_did_receive_final_result('hello world')
# voice assistant should say all responses until needs input
assert len(voice_assistant.speech_synthesizer.results) == 5
assert len(voice_assistant._responses) == 4
for response in ['Hello, world!', 'First response', 'Second response', 'Third response', 'Needs input']:
assert voice_assistant.speech_synthesizer.results.pop(0).text == response
# interact to emulate user input and continue repeating responses
voice_assistant.speech_recognizer_did_receive_final_result('hello world')
# voice assistant should say all left responses
assert len(voice_assistant.speech_synthesizer.results) == 5
assert len(voice_assistant._responses) == 0
for response in ['Hello, world!', 'Fourth response', 'Fifth response', 'Sixth response', 'Finished long background task']:
assert voice_assistant.speech_synthesizer.results.pop(0).text == response
def test_background_waiting_with_context(voice_assistant):
voice_assistant.speech_recognizer_did_receive_final_result('background with context')
assert len(voice_assistant.commands_context._threads) == 1
# force a timeout
voice_assistant._last_interaction_time -= timedelta(seconds = voice_assistant.mode.timeout_after_interaction + 1)
# wait for thread to finish
voice_assistant.commands_context._threads[0].thread.join()
# receive context output
voice_assistant.commands_context._check_threads()
# voice assistant should play all (including adding context) and save all responses for later
assert len(voice_assistant.speech_synthesizer.results) == 2 # 2 = first and returned
assert len(voice_assistant._responses) == 1 # first response is not saved because it plays immediately
assert len(voice_assistant.commands_context._context_queue) == 2
voice_assistant.speech_synthesizer.results.clear()
# emulate delay after last response before repeating
for response in voice_assistant._responses:
response.time -= timedelta(seconds = voice_assistant.mode.timeout_before_repeat + 1)
# interact to reset timeout mode, voice assistant should reset context, repeat responses and add response context
voice_assistant.speech_recognizer_did_receive_final_result('lorem ipsum dolor')
assert len(voice_assistant.speech_synthesizer.results) == 2
assert len(voice_assistant.commands_context._context_queue) == 2
def test_background_waiting_remove_response(voice_assistant):
voice_assistant.speech_recognizer_did_receive_final_result('background remove response')
assert len(voice_assistant.commands_context._threads) == 1
voice_assistant.speech_synthesizer.results.clear()
# force a timeout by settings zero time
voice_assistant._last_interaction_time -= timedelta(seconds = voice_assistant.mode.timeout_after_interaction + 1)
# catch all responses
responses_cache = voice_assistant._responses.copy()
while voice_assistant.commands_context._threads[0].thread.is_alive():
async def test_background_waiting_needs_input(voice_assistant, autojump_clock):
async with voice_assistant() as voice_assistant:
voice_assistant.speech_recognizer_did_receive_final_result('background needs input')
# assert len(voice_assistant.commands_context._threads) == 1
# force a timeout
voice_assistant._last_interaction_time -= timedelta(seconds = voice_assistant.mode.timeout_after_interaction + 1)
# wait for thread to finish
# voice_assistant.commands_context._threads[0].thread.join()
# receive context output
# voice_assistant.commands_context._check_threads()
# voice assistant should save all responses for later
assert len(voice_assistant._responses) == 8
assert len(voice_assistant.speech_synthesizer.results) == 8
voice_assistant.speech_synthesizer.results.clear()
# emulate delay after last response before repeating
for response in voice_assistant._responses:
if not response in responses_cache:
responses_cache.append(response)
response.time -= timedelta(seconds = voice_assistant.mode.timeout_before_repeat + 1)
# interact to reset timeout mode
voice_assistant.speech_recognizer_did_receive_final_result('hello world')
# voice assistant should say all responses until needs input
assert len(voice_assistant.speech_synthesizer.results) == 5
assert len(voice_assistant._responses) == 4
for response in ['Hello, world!', 'First response', 'Second response', 'Third response', 'Needs input']:
assert voice_assistant.speech_synthesizer.results.pop(0).text == response
# interact to emulate user input and continue repeating responses
voice_assistant.speech_recognizer_did_receive_final_result('hello world')
# voice assistant should say all left responses
assert len(voice_assistant.speech_synthesizer.results) == 5
assert len(voice_assistant._responses) == 0
for response in ['Hello, world!', 'Fourth response', 'Fifth response', 'Sixth response', 'Finished long background task']:
assert voice_assistant.speech_synthesizer.results.pop(0).text == response
# check response cached, removed and will not be repeated
assert len(responses_cache) == 1
assert len(voice_assistant.speech_synthesizer.results) == 1
assert responses_cache.pop(0).text == 'Deleted response'
assert len(voice_assistant._responses) == 0
voice_assistant.speech_synthesizer.results.clear()
# interact to reset timeout mode, check that removed response is not repeated
voice_assistant.speech_recognizer_did_receive_final_result('hello world')
assert len(voice_assistant.speech_synthesizer.results) == 1
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Hello, world!'
async def test_background_waiting_with_context(voice_assistant, autojump_clock):
async with voice_assistant() as voice_assistant:
voice_assistant.speech_recognizer_did_receive_final_result('background with context')
# assert len(voice_assistant.commands_context._threads) == 1
# force a timeout
voice_assistant._last_interaction_time -= timedelta(seconds = voice_assistant.mode.timeout_after_interaction + 1)
# wait for thread to finish
# voice_assistant.commands_context._threads[0].thread.join()
# receive context output
# voice_assistant.commands_context._check_threads()
# voice assistant should play all (including adding context) and save all responses for later
assert len(voice_assistant.speech_synthesizer.results) == 2 # 2 = first and returned
assert len(voice_assistant._responses) == 1 # first response is not saved because it plays immediately
assert len(voice_assistant.commands_context._context_queue) == 2
voice_assistant.speech_synthesizer.results.clear()
# emulate delay after last response before repeating
for response in voice_assistant._responses:
response.time -= timedelta(seconds = voice_assistant.mode.timeout_before_repeat + 1)
# interact to reset timeout mode, voice assistant should reset context, repeat responses and add response context
voice_assistant.speech_recognizer_did_receive_final_result('lorem ipsum dolor')
assert len(voice_assistant.speech_synthesizer.results) == 2
assert len(voice_assistant.commands_context._context_queue) == 2
async def test_background_waiting_remove_response(voice_assistant, autojump_clock):
async with voice_assistant() as voice_assistant:
voice_assistant.speech_recognizer_did_receive_final_result('background remove response')
# assert len(voice_assistant.commands_context._threads) == 1
voice_assistant.speech_synthesizer.results.clear()
# force a timeout by settings zero time
voice_assistant._last_interaction_time -= timedelta(seconds = voice_assistant.mode.timeout_after_interaction + 1)
# catch all responses
responses_cache = voice_assistant._responses.copy()
# while voice_assistant.commands_context._threads[0].thread.is_alive():
# for response in voice_assistant._responses:
# if not response in responses_cache:
# responses_cache.append(response)
# check response cached, removed and will not be repeated
assert len(responses_cache) == 1
assert len(voice_assistant.speech_synthesizer.results) == 1
assert responses_cache.pop(0).text == 'Deleted response'
assert len(voice_assistant._responses) == 0
voice_assistant.speech_synthesizer.results.clear()
# interact to reset timeout mode, check that removed response is not repeated
voice_assistant.speech_recognizer_did_receive_final_result('hello world')
assert len(voice_assistant.speech_synthesizer.results) == 1
assert voice_assistant.speech_synthesizer.results.pop(0).text == 'Hello, world!'