You've already forked STARK
mirror of
https://github.com/MarkParker5/STARK.git
synced 2026-04-23 19:31:10 +02:00
211 lines
5.8 KiB
Python
211 lines
5.8 KiB
Python
import random
|
|
import time
|
|
|
|
import pytest
|
|
from faker import Faker
|
|
from pympler.asizeof import asizeof
|
|
|
|
from stark.tools.dictionary.dictionary import Dictionary, LookupMode
|
|
from stark.tools.dictionary.storage.storage_memory import (
|
|
DictionaryStorageMemory,
|
|
)
|
|
from stark.tools.dictionary.storage.storage_sqlite import (
|
|
DictionaryStorageSQLite,
|
|
)
|
|
|
|
|
|
# Benchmark
|
|
@pytest.mark.timeout(60.0 * 10)
|
|
@pytest.mark.benchmark(
|
|
timer=time.monotonic,
|
|
min_time=1,
|
|
max_time=1.0,
|
|
min_rounds=10,
|
|
)
|
|
# Report
|
|
@pytest.mark.report_duration
|
|
@pytest.mark.report_tracemalloc
|
|
# Parametrize
|
|
@pytest.mark.parametrize(
|
|
"dict_size",
|
|
[
|
|
100,
|
|
1_000,
|
|
10_000,
|
|
100_000,
|
|
1_000_000,
|
|
10_000_000,
|
|
100_000_000,
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"success",
|
|
[True, False],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"lookup_mode",
|
|
[
|
|
# LookupMode.EXACT,
|
|
# LookupMode.CONTAINS,
|
|
# LookupMode.FUZZY,
|
|
LookupMode.AUTO,
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"lookup_func",
|
|
[
|
|
"lookup",
|
|
"lookup_sorted",
|
|
"search_in_sentence",
|
|
"search_in_sentence_sorted",
|
|
],
|
|
)
|
|
@pytest.mark.parametrize("storage_type", ["sqlite"]) # , "memory"])
|
|
# Other
|
|
def test_benchmark__dictionary(
|
|
# benchmark cases
|
|
dict_size: int,
|
|
success: bool,
|
|
lookup_mode: LookupMode,
|
|
lookup_func: str,
|
|
storage_type: str,
|
|
# fixtures
|
|
benchmark,
|
|
# additional parameters
|
|
seed: int | None = None,
|
|
):
|
|
# params that are not part of the parametrized cases and just randomly generated
|
|
ne_type = random.choice(["name", "place"])
|
|
# print(f"{success=}, {ne_type=}, {targets_amount=}")
|
|
|
|
fake = Faker("en")
|
|
|
|
if seed is not None:
|
|
random.seed(seed)
|
|
Faker.seed(seed)
|
|
|
|
def get_random_entry() -> str:
|
|
if ne_type == "name":
|
|
return fake.unique.name()
|
|
elif ne_type == "place":
|
|
return random.choice(
|
|
[
|
|
fake.street_name(),
|
|
fake.city(),
|
|
fake.state(),
|
|
fake.country(),
|
|
fake.location_on_land()[2],
|
|
]
|
|
)
|
|
else:
|
|
raise ValueError(f"Invalid entity type: {ne_type}")
|
|
|
|
# Prepare the dictionary
|
|
|
|
if storage_type == "memory":
|
|
dictionary = Dictionary(DictionaryStorageMemory())
|
|
elif storage_type == "sqlite":
|
|
# dictionary = Dictionary(DictionaryStorageSQLite(":memory:"))
|
|
dictionary = Dictionary(DictionaryStorageSQLite(f"sqlite3://data/test_dictionary_{ne_type}_{dict_size}.sqlite3"))
|
|
else:
|
|
raise ValueError(f"Invalid storage type: {storage_type}")
|
|
|
|
# Fill the dictionary if needed
|
|
|
|
if dictionary.storage.get_count() == 0:
|
|
for i in range(dict_size):
|
|
dictionary.write_one(language_code="en", name=get_random_entry(), metadata={"idx": i})
|
|
|
|
# Log RAM usage of the full dictionary after build
|
|
|
|
dictionary_ram = asizeof(dictionary) // 1024**2 # MB
|
|
print(f"RAM usage after loading {dict_size} entries: {dictionary_ram:.2f} MB")
|
|
assert dictionary_ram < 900, "RAM usage exceeded 900MB"
|
|
|
|
# Select name(s) to search
|
|
|
|
def get_targets() -> list[str]:
|
|
targets_amount = random.choice([1, 1, 1, 2, 3])
|
|
targets: list[str] = []
|
|
for i in range(targets_amount):
|
|
name = get_random_entry()
|
|
targets.append(name)
|
|
if success:
|
|
dictionary.write_one(language_code="en", name=name, metadata={"idx": f"x{i}"})
|
|
return targets
|
|
|
|
# Prepare sentence
|
|
|
|
def get_sentence(inject_targets: list[str]) -> str:
|
|
sentence_length = random.randint(5, 15)
|
|
sentence = fake.sentence(nb_words=sentence_length)
|
|
words = sentence.split()
|
|
for target in inject_targets:
|
|
index = random.randint(0, len(words) - 1)
|
|
words[index] += " " + target
|
|
return " ".join(words)
|
|
|
|
# Run benchmarks
|
|
|
|
def execute_lookup():
|
|
targets = get_targets()
|
|
sentence = get_sentence(targets)
|
|
|
|
if lookup_func == "lookup":
|
|
return list(dictionary.lookup(targets[0], "en", mode=lookup_mode))
|
|
elif lookup_func == "lookup_sorted":
|
|
return list(dictionary.lookup_sorted(targets[0], "en", mode=lookup_mode))
|
|
elif lookup_func == "search_in_sentence":
|
|
return list(dictionary.search_in_sentence(sentence, "en", mode=lookup_mode))
|
|
elif lookup_func == "search_in_sentence_sorted":
|
|
return list(dictionary.search_in_sentence_sorted(sentence, "en", mode=lookup_mode))
|
|
else:
|
|
raise ValueError(f"Invalid lookup function: {lookup_func}")
|
|
|
|
if benchmark:
|
|
result = benchmark(execute_lookup)
|
|
else:
|
|
result = execute_lookup()
|
|
|
|
if success:
|
|
assert result
|
|
else:
|
|
assert not result
|
|
|
|
|
|
# @pytest.mark.benchmark
|
|
# @pytest.mark.parametrize(
|
|
# "to_ipa", [to_ipa__espeak_cli, to_ipa__espeak_bin, to_ipa__epitran]
|
|
# )
|
|
# def test_benchmark__to_ipa(benchmark, to_ipa: Callable[[str, str], str]):
|
|
# from faker import Faker
|
|
|
|
# locales = {
|
|
# "en": "en_US",
|
|
# "es": "es_ES",
|
|
# "fr": "fr_FR",
|
|
# "de": "de_DE",
|
|
# "it": "it_IT",
|
|
# "uk": "uk_UA",
|
|
# "ru": "ru_RU",
|
|
# "nl": "nl_NL",
|
|
# }
|
|
|
|
# faker_objects = {lang: Faker(loc) for lang, loc in locales.items()}
|
|
|
|
# test_cases = [
|
|
# f"{lang}:{faker_objects[lang].sentence(nb_words=faker_objects[lang].random_int(min=3, max=30))}"
|
|
# for lang in locales
|
|
# for _ in range(10)
|
|
# ]
|
|
|
|
# # print(test_cases)
|
|
|
|
# def test():
|
|
# for case in test_cases:
|
|
# language, text = case.split(":")
|
|
# assert to_ipa(text, language)
|
|
|
|
# test() # warm up to instantiate lazy cached deps
|
|
# benchmark(test) # run the benchmark
|