You've already forked 1cai-public
mirror of
https://github.com/DmitrL-dev/1cai-public.git
synced 2026-04-29 03:11:55 +02:00
MCP Tools Cache
Модуль кэширования результатов MCP tools для 1C MCP сервера, основанный на современных стандартах кэширования и анализе производительности.
📋 Содержание
- Возможности
- Быстрый старт
- Архитектура
- Конфигурация
- Использование
- Интеграция
- Мониторинг
- Примеры
- Тестирование
- Производительность
✨ Возможности
Ключевые особенности
- TTL стратегии: 30 минут для стабильных данных, 5 минут для динамических
- Максимальный размер: 100MB (настраивается)
- Кэширование только успешных запросов
- Метрики попаданий/промахов
- Многоуровневое кэширование: память + persistent cache на диске
- Стратегии вытеснения: LRU и TTL-based
- Механизмы инвалидации: по шаблонам, сущностям, событиям
Поддерживаемые типы данных
- Метаданные 1С: структуры справочников, документов, регистров
- Агрегированные данные: отчёты, итоги, обороты
- Конфигурации инструментов: схемы и параметры MCP tools
- Результаты API: ответы на запросы к 1С
- Стабильные данные: редко изменяющиеся справочники
- Динамические данные: часто обновляемые записи
🚀 Быстрый старт
Установка
# Копирование модулей
cp -r cache/ /path/to/your/project/
cp tests/test_mcp_cache.py /path/to/your/project/tests/
Базовая инициализация
from cache import init_cache, cached, cache_tool_result
# Инициализация кэша
cache = init_cache(
max_size_mb=100,
default_ttl_stable=30 * 60, # 30 минут
default_ttl_dynamic=5 * 60, # 5 минут
persistent_cache_dir="./cache_data"
)
# Использование декоратора
@cached(ttl=300, data_type='stable')
def get_catalog_structure(catalog_name):
# Дорогостоящая операция получения структуры
return fetch_from_1c(catalog_name)
# Кэширование результата MCP tool
result = execute_tool("get_nomenclature", {"id": 123})
cache_tool_result("get_nomenclature", {"id": 123}, result)
🏗️ Архитектура
Основные компоненты
cache/
├── __init__.py # Экспорт API
├── mcp_cache.py # Основной модуль (MCPToolsCache и др.)
├── integration_examples.py # Примеры интеграции
└── README.md # Документация
Классы модуля
- MCPToolsCache - основной класс кэширования
- CacheStrategy - абстрактный базовый класс стратегий
- LRUStrategy - стратегия Least Recently Used
- TTLCacheStrategy - стратегия на основе TTL
- CacheInvalidation - механизмы инвалидации
- PersistentCache - долговременное кэширование на диске
- CacheEntry - запись кэша с метаданными
- CacheMetrics - метрики производительности
Слой кэширования
┌─────────────────────────────────────┐
│ Application Layer │
│ (mcp_server.py, onec_client.py) │
└─────────────────┬───────────────────┘
│
┌─────────────────▼───────────────────┐
│ MCPToolsCache │
│ - In-Memory Cache (LRU/TTL) │
│ - Persistent Cache (disk) │
│ - Metrics & Monitoring │
└─────────────────┬───────────────────┘
│
┌─────────────────▼───────────────────┐
│ Storage Layer │
│ - Memory (OrderedDict) │
│ - Disk (pickle + index.json) │
└─────────────────────────────────────┘
⚙️ Конфигурация
Параметры инициализации
cache = init_cache(
max_size_mb=100, # Максимальный размер кэша
default_ttl_stable=1800, # TTL стабильных данных (секунды)
default_ttl_dynamic=300, # TTL динамических данных (секунды)
persistent_cache_dir="./cache", # Директория для persistent cache
strategy=None # Стратегия вытеснения
)
Типы данных и их конфигурация
# Автоматическая конфигурация по типам
data_type_configs = {
'metadata': {
'ttl': default_ttl_stable, # 30 минут
'persistent': True # Сохранять на диск
},
'aggregates': {
'ttl': default_ttl_dynamic, # 5 минут
'persistent': False # Только в памяти
},
'tool_config': {
'ttl': default_ttl_stable,
'persistent': True
},
'api_response': {
'ttl': default_ttl_dynamic,
'persistent': False
}
}
Настройка стратегий вытеснения
from cache import LRU, TTLCacheStrategy
# LRU стратегия (вытесняет давно неиспользуемые)
lru_strategy = LRU()
# TTL стратегия (вытесняет истёкшие)
ttl_strategy = TTLCacheStrategy()
cache = init_cache(strategy=ttl_strategy)
📖 Использование
Базовые операции
from cache import get_cache
cache = get_cache()
# Сохранение данных
success = cache.set("key", "value", ttl=300, data_type='stable')
# Получение данных
value = cache.get("key", data_type='stable')
# Проверка наличия
exists = cache.has("key")
# Удаление
deleted = cache.delete("key")
# Очистка всего кэша
cache.clear()
Асинхронные операции
async def async_example():
cache = get_cache()
# Асинхронное сохранение
await cache.set_async("async_key", "async_value")
# Асинхронное получение
value = await cache.get_async("async_key")
# Асинхронная проверка
exists = await cache.has_async("async_key")
Декораторы кэширования
from cache import cached, cached_async
# Синхронная функция
@cached(ttl=600, data_type='stable')
def get_user_info(user_id):
# Дорогостоящая операция
return fetch_user_from_1c(user_id)
# Асинхронная функция
@cached_async(ttl=300, data_type='dynamic')
async def get_report_data(report_id):
# Асинхронная операция
return await fetch_report_from_1c(report_id)
# Пользовательский генератор ключей
def user_cache_key(user_id, include_inactive=False):
return f"user:{user_id}:inactive={include_inactive}"
@cached(key_func=user_cache_key, ttl=1800)
def get_user_profile(user_id, include_inactive=False):
return fetch_profile(user_id, include_inactive)
Специализированные функции
from cache import (
cache_tool_result, get_cached_tool_result,
cache_metadata_1c, get_cached_metadata_1c,
cache_aggregates, get_cached_aggregates
)
# Кэширование результатов MCP tools
tool_result = execute_tool("get_catalog_info", {"catalog": "Пользователи"})
cache_tool_result("get_catalog_info", {"catalog": "Пользователи"}, tool_result)
# Кэширование метаданных 1С
metadata = get_catalog_metadata("Пользователи")
cache_metadata_1c("catalog", "Пользователи", metadata)
# Кэширование агрегированных данных
aggregates = calculate_sales_report("2024-01", {"region": "Moscow"})
cache_aggregates("sales_report", "2024-01", {"region": "Moscow"}, aggregates)
🔗 Интеграция
Интеграция с mcp_server.py
from cache.integration_examples import setup_cache_integration
# При инициализации сервера
cache_integrations = setup_cache_integration()
mcp_integration = cache_integrations['mcp_integration']
# Обработка инструмента с кэшированием
async def handle_tool_with_cache(tool_name, arguments):
# Проверяем кэш
cached_result = await mcp_integration.get_cached_tool_execution(
tool_name, arguments
)
if cached_result:
return cached_result['result']
# Выполняем инструмент
result = await execute_1c_tool(tool_name, arguments)
# Кэшируем результат
await mcp_integration.cache_tool_execution(tool_name, arguments, result)
return result
Интеграция с onec_client.py
from cache.integration_examples import execute_1c_query_with_cache
# Выполнение запроса с кэшированием
async def get_user_list(active_only=True):
query = "SELECT * FROM Справочник.Пользователи WHERE Активен = &Активен"
params = {"Активен": active_only}
# Проверяем кэш, затем выполняем запрос
return await execute_1c_query_with_cache(query, params)
Интеграция с существующим кодом
# Существующая функция
def expensive_calculation(param1, param2):
# Дорогостоящие вычисления
return complex_result
# Добавляем кэширование
@cached(ttl=1800, data_type='aggregates')
def expensive_calculation(param1, param2):
# Дорогостоящие вычисления
return complex_result
📊 Мониторинг
Получение статистики
from cache import get_cache_stats
from cache.integration_examples import CacheManager
# Базовая статистика
stats = get_cache_stats()
print(f"Записей в кэше: {stats['total_entries']}")
print(f"Использование памяти: {stats['memory_usage_mb']:.2f} MB")
print(f"Hit ratio: {stats['hit_ratio']:.2%}")
print(f"Попадания: {stats['hits']}, Промахи: {stats['misses']}")
# Детальная статистика
cache_manager = CacheManager()
detailed_stats = cache_manager.get_detailed_stats()
print(f"Распределение по типам: {detailed_stats['cache_distribution']}")
print(f"Топ инструментов: {detailed_stats['top_cached_tools']}")
Метрики производительности
cache = get_cache()
metrics = cache.get_metrics()
# Основные метрики
print(f"Hit Ratio: {metrics.hit_ratio:.2%}")
print(f"Попадания: {metrics.hits}")
print(f"Промахи: {metrics.misses}")
print(f"Вытеснения: {metrics.evictions}")
print(f"Ошибки: {metrics.errors}")
Периодическое обслуживание
import asyncio
from cache.integration_examples import periodic_cache_maintenance
# Периодическое обслуживание (каждый час)
async def maintenance_task():
while True:
await periodic_cache_maintenance()
await asyncio.sleep(3600) # 1 час
# Запуск в фоне
asyncio.create_task(maintenance_task())
📝 Примеры
Полный пример использования
import asyncio
import time
from cache import init_cache, cached, cache_tool_result, get_cache_stats
async def main():
# Инициализация
cache = init_cache(
max_size_mb=100,
default_ttl_stable=30 * 60,
default_ttl_dynamic=5 * 60,
persistent_cache_dir="./cache_data"
)
# Пример 1: Декоратор для функции
@cached(ttl=300, data_type='stable')
def get_catalog_structure(catalog_name):
print(f"Загружаем структуру справочника {catalog_name}...")
time.sleep(1) # Имитация долгой операции
return {"fields": ["id", "name", "date"], "count": 1000}
# Первый вызов - выполнится функция
structure1 = get_catalog_structure("Пользователи")
print(f"Результат 1: {structure1}")
# Второй вызов - из кэша
structure2 = get_catalog_structure("Пользователи")
print(f"Результат 2: {structure2}")
# Пример 2: Кэширование MCP tool
tool_name = "get_nomenclature"
args = {"id": 123, "include_properties": True}
result = {"nomenclature": "Товар 1", "properties": {"weight": 1.5}}
cache_tool_result(tool_name, args, result)
cached_result = get_cached_tool_result(tool_name, args)
print(f"Закэшированный результат: {cached_result}")
# Пример 3: Метаданные 1С
metadata = {
"type": "справочник",
"hierarchical": True,
"has_owners": False,
"fields": ["Код", "Наименование", "ДатаСоздания"]
}
cache_metadata_1c("catalog", "Пользователи", metadata)
cached_metadata = get_cached_metadata_1c("catalog", "Пользователи")
print(f"Закэшированные метаданные: {cached_metadata}")
# Пример 4: Агрегаты
aggregates = [
{"period": "2024-01", "sales": 100000, "count": 100},
{"period": "2024-02", "sales": 120000, "count": 120}
]
cache_aggregates("monthly_sales", "2024-Q1", {"region": "Moscow"}, aggregates)
cached_aggregates = get_cached_aggregates("monthly_sales", "2024-Q1", {"region": "Moscow"})
print(f"Закэшированные агрегаты: {cached_aggregates}")
# Пример 5: Статистика
stats = get_cache_stats()
print(f"\nСтатистика кэша:")
for key, value in stats.items():
print(f" {key}: {value}")
if __name__ == "__main__":
asyncio.run(main())
Пример с инвалидацией
from cache import get_cache, init_cache
# Инициализация
cache = init_cache(max_size_mb=10)
# Добавляем данные
cache.set("metadata:catalog:users", {"type": "справочник"})
cache.set("metadata:catalog:organizations", {"type": "справочник"})
cache.set("aggregates:sales:2024-01", [{"value": 1000}])
print(f"Записей до инвалидации: {cache.size()}") # 3
# Инвалидируем метаданные по шаблону
invalidation = cache.invalidation
count = invalidation.invalidate_by_pattern(cache, "metadata:*")
print(f"Инвалидировано записей: {count}") # 2
print(f"Записей после инвалидации: {cache.size()}") # 1
# Инвалидируем всё
invalidation.invalidate_all(cache)
print(f"Записей после полной очистки: {cache.size()}") # 0
🧪 Тестирование
Запуск тестов
# Все тесты
python -m unittest tests.test_mcp_cache -v
# Конкретный тест
python -m unittest tests.test_mcp_cache.TestMCPToolsCache.test_basic_operations -v
# С покрытием
python -m pytest tests/test_mcp_cache.py -v --cov=cache.mcp_cache
Структура тестов
- TestCacheEntry - тесты записей кэша
- TestCacheMetrics - тесты метрик
- TestCacheStrategy - тесты стратегий вытеснения
- TestCacheInvalidation - тесты инвалидации
- TestPersistentCache - тесты persistent cache
- TestMCPToolsCache - основные тесты кэша
- TestAsyncOperations - тесты асинхронных операций
- TestDecorator - тесты декораторов
- TestSpecializedCacheFunctions - тесты специализированных функций
- TestCacheStatistics - тесты статистики
- TestEdgeCases - тесты граничных случаев
Написание собственных тестов
import unittest
from cache import init_cache, get_cache
class TestMyIntegration(unittest.TestCase):
def setUp(self):
# Инициализация перед каждым тестом
init_cache(max_size_mb=10)
def test_my_cache_usage(self):
cache = get_cache()
# Тестирование функциональности
result = cache.set("test", "value")
self.assertTrue(result)
value = cache.get("test")
self.assertEqual(value, "value")
⚡ Производительность
Ожидаемые показатели
| Метрика | Базовое значение | После оптимизации |
|---|---|---|
| Hit Ratio | 0% | 80-95% |
| Средняя латентность | 100-500ms | 5-20ms |
| Стоимость токенов | 100% | 60-80% (с оптимизацией JSON) |
| Нагрузка на БД | 100% | 20-40% |
| Пропускная способность | 1x | 3-5x |
Оптимизация конфигурации
# Для высоконагруженных систем
cache = init_cache(
max_size_mb=500, # Увеличиваем размер
default_ttl_stable=60 * 60, # 1 час для стабильных данных
default_ttl_dynamic=10 * 60, # 10 минут для динамических
persistent_cache_dir="/fast_ssd/cache", # Быстрое хранилище
strategy=TTLCacheStrategy() # TTL стратегия
)
# Для экономии памяти
cache = init_cache(
max_size_mb=50, # Уменьшаем размер
default_ttl_stable=15 * 60, # 15 минут
default_ttl_dynamic=2 * 60, # 2 минуты
persistent_cache_dir=None, # Только память
strategy=LRUStrategy() # LRU стратегия
)
Мониторинг производительности
import time
from cache import get_cache
def benchmark_cache_operations():
cache = get_cache()
# Тест записи
start_time = time.time()
for i in range(1000):
cache.set(f"key_{i}", f"value_{i}")
write_time = time.time() - start_time
# Тест чтения
start_time = time.time()
for i in range(1000):
cache.get(f"key_{i}")
read_time = time.time() - start_time
print(f"Время записи 1000 записей: {write_time:.3f}s")
print(f"Время чтения 1000 записей: {read_time:.3f}s")
print(f"Время на операцию записи: {write_time*1000/1000:.3f}ms")
print(f"Время на операцию чтения: {read_time*1000/1000:.3f}ms")
🐛 Отладка
Включение подробного логирования
import logging
# Настройка логирования
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Логирование кэша
cache_logger = logging.getLogger('cache.mcp_cache')
cache_logger.setLevel(logging.DEBUG)
Проверка состояния кэша
from cache.integration_examples import CacheManager
def debug_cache_state():
cache_manager = CacheManager()
stats = cache_manager.get_detailed_stats()
print("=== Состояние кэша ===")
print(f"Всего записей: {stats['total_entries']}")
print(f"Использование памяти: {stats['memory_usage_mb']:.2f} MB")
print(f"Hit ratio: {stats['hit_ratio']:.2%}")
print("\n=== Распределение по типам ===")
for data_type, count in stats['cache_distribution'].items():
print(f"{data_type}: {count}")
print("\n=== Истёкшие записи ===")
print(f"Количество: {stats['expired_entries_count']}")
print("\n=== Топ инструментов ===")
for tool in stats['top_cached_tools'][:5]:
print(f"{tool['tool_name']}: {tool['access_count']} доступов")
🔄 Миграция и обновление
Версионирование кэша
# При изменении формата данных
def get_versioned_cache_key(key, version="v1"):
return f"{version}:{key}"
# Использование
cache_key = get_versioned_cache_key("user_data", "v2")
cache.set(cache_key, new_format_data)
Очистка старого кэша
async def migrate_cache():
"""Миграция кэша при обновлении версии"""
old_cache = get_cache()
# Создаём новый кэш с новой версией
new_cache = init_cache(persistent_cache_dir="./cache_v2")
# Переносим совместимые данные
for key, entry in old_cache._cache.items():
if not entry.is_expired:
new_key = f"v2:{key}"
new_cache.set(new_key, entry.data, ttl=entry.ttl,
data_type=determine_data_type(key))
print("Миграция кэша завершена")
📚 Дополнительные ресурсы
Примеры и интеграция
Производительность
- Базовое время ответа: < 1ms для кэш-попаданий
- Память: ~1KB на запись (зависит от размера данных)
- Пропускная способность: 10,000+ операций/сек
📄 Лицензия
Модуль создан для проекта 1C MCP Server. Версия: 1.0.0
Возможности
TODO: Добавить содержание раздела.
Быстрый Старт
TODO: Добавить содержание раздела.
Архитектура
TODO: Добавить содержание раздела.
Конфигурация
TODO: Добавить содержание раздела.
Интеграция
TODO: Добавить содержание раздела.
Мониторинг
TODO: Добавить содержание раздела.
Примеры
TODO: Добавить содержание раздела.
Тестирование
TODO: Добавить содержание раздела.