1
0
mirror of https://github.com/DmitrL-dev/1cai-public.git synced 2026-04-29 03:11:55 +02:00
Files
..

MCP Tools Cache

Модуль кэширования результатов MCP tools для 1C MCP сервера, основанный на современных стандартах кэширования и анализе производительности.

📋 Содержание

Возможности

Ключевые особенности

  • TTL стратегии: 30 минут для стабильных данных, 5 минут для динамических
  • Максимальный размер: 100MB (настраивается)
  • Кэширование только успешных запросов
  • Метрики попаданий/промахов
  • Многоуровневое кэширование: память + persistent cache на диске
  • Стратегии вытеснения: LRU и TTL-based
  • Механизмы инвалидации: по шаблонам, сущностям, событиям

Поддерживаемые типы данных

  • Метаданные 1С: структуры справочников, документов, регистров
  • Агрегированные данные: отчёты, итоги, обороты
  • Конфигурации инструментов: схемы и параметры MCP tools
  • Результаты API: ответы на запросы к 1С
  • Стабильные данные: редко изменяющиеся справочники
  • Динамические данные: часто обновляемые записи

🚀 Быстрый старт

Установка

# Копирование модулей
cp -r cache/ /path/to/your/project/
cp tests/test_mcp_cache.py /path/to/your/project/tests/

Базовая инициализация

from cache import init_cache, cached, cache_tool_result

# Инициализация кэша
cache = init_cache(
    max_size_mb=100,
    default_ttl_stable=30 * 60,  # 30 минут
    default_ttl_dynamic=5 * 60,   # 5 минут
    persistent_cache_dir="./cache_data"
)

# Использование декоратора
@cached(ttl=300, data_type='stable')
def get_catalog_structure(catalog_name):
    # Дорогостоящая операция получения структуры
    return fetch_from_1c(catalog_name)

# Кэширование результата MCP tool
result = execute_tool("get_nomenclature", {"id": 123})
cache_tool_result("get_nomenclature", {"id": 123}, result)

🏗️ Архитектура

Основные компоненты

cache/
├── __init__.py              # Экспорт API
├── mcp_cache.py            # Основной модуль (MCPToolsCache и др.)
├── integration_examples.py # Примеры интеграции
└── README.md               # Документация

Классы модуля

  1. MCPToolsCache - основной класс кэширования
  2. CacheStrategy - абстрактный базовый класс стратегий
  3. LRUStrategy - стратегия Least Recently Used
  4. TTLCacheStrategy - стратегия на основе TTL
  5. CacheInvalidation - механизмы инвалидации
  6. PersistentCache - долговременное кэширование на диске
  7. CacheEntry - запись кэша с метаданными
  8. CacheMetrics - метрики производительности

Слой кэширования

┌─────────────────────────────────────┐
│         Application Layer           │
│  (mcp_server.py, onec_client.py)   │
└─────────────────┬───────────────────┘
                  │
┌─────────────────▼───────────────────┐
│         MCPToolsCache               │
│  - In-Memory Cache (LRU/TTL)       │
│  - Persistent Cache (disk)         │
│  - Metrics & Monitoring            │
└─────────────────┬───────────────────┘
                  │
┌─────────────────▼───────────────────┐
│        Storage Layer                │
│  - Memory (OrderedDict)            │
│  - Disk (pickle + index.json)      │
└─────────────────────────────────────┘

⚙️ Конфигурация

Параметры инициализации

cache = init_cache(
    max_size_mb=100,              # Максимальный размер кэша
    default_ttl_stable=1800,      # TTL стабильных данных (секунды)
    default_ttl_dynamic=300,      # TTL динамических данных (секунды)
    persistent_cache_dir="./cache",  # Директория для persistent cache
    strategy=None                 # Стратегия вытеснения
)

Типы данных и их конфигурация

# Автоматическая конфигурация по типам
data_type_configs = {
    'metadata': {
        'ttl': default_ttl_stable,  # 30 минут
        'persistent': True          # Сохранять на диск
    },
    'aggregates': {
        'ttl': default_ttl_dynamic, # 5 минут
        'persistent': False         # Только в памяти
    },
    'tool_config': {
        'ttl': default_ttl_stable,
        'persistent': True
    },
    'api_response': {
        'ttl': default_ttl_dynamic,
        'persistent': False
    }
}

Настройка стратегий вытеснения

from cache import LRU, TTLCacheStrategy

# LRU стратегия (вытесняет давно неиспользуемые)
lru_strategy = LRU()

# TTL стратегия (вытесняет истёкшие)
ttl_strategy = TTLCacheStrategy()

cache = init_cache(strategy=ttl_strategy)

📖 Использование

Базовые операции

from cache import get_cache

cache = get_cache()

# Сохранение данных
success = cache.set("key", "value", ttl=300, data_type='stable')

# Получение данных
value = cache.get("key", data_type='stable')

# Проверка наличия
exists = cache.has("key")

# Удаление
deleted = cache.delete("key")

# Очистка всего кэша
cache.clear()

Асинхронные операции

async def async_example():
    cache = get_cache()
    
    # Асинхронное сохранение
    await cache.set_async("async_key", "async_value")
    
    # Асинхронное получение
    value = await cache.get_async("async_key")
    
    # Асинхронная проверка
    exists = await cache.has_async("async_key")

Декораторы кэширования

from cache import cached, cached_async

# Синхронная функция
@cached(ttl=600, data_type='stable')
def get_user_info(user_id):
    # Дорогостоящая операция
    return fetch_user_from_1c(user_id)

# Асинхронная функция
@cached_async(ttl=300, data_type='dynamic')
async def get_report_data(report_id):
    # Асинхронная операция
    return await fetch_report_from_1c(report_id)

# Пользовательский генератор ключей
def user_cache_key(user_id, include_inactive=False):
    return f"user:{user_id}:inactive={include_inactive}"

@cached(key_func=user_cache_key, ttl=1800)
def get_user_profile(user_id, include_inactive=False):
    return fetch_profile(user_id, include_inactive)

Специализированные функции

from cache import (
    cache_tool_result, get_cached_tool_result,
    cache_metadata_1c, get_cached_metadata_1c,
    cache_aggregates, get_cached_aggregates
)

# Кэширование результатов MCP tools
tool_result = execute_tool("get_catalog_info", {"catalog": "Пользователи"})
cache_tool_result("get_catalog_info", {"catalog": "Пользователи"}, tool_result)

# Кэширование метаданных 1С
metadata = get_catalog_metadata("Пользователи")
cache_metadata_1c("catalog", "Пользователи", metadata)

# Кэширование агрегированных данных
aggregates = calculate_sales_report("2024-01", {"region": "Moscow"})
cache_aggregates("sales_report", "2024-01", {"region": "Moscow"}, aggregates)

🔗 Интеграция

Интеграция с mcp_server.py

from cache.integration_examples import setup_cache_integration

# При инициализации сервера
cache_integrations = setup_cache_integration()
mcp_integration = cache_integrations['mcp_integration']

# Обработка инструмента с кэшированием
async def handle_tool_with_cache(tool_name, arguments):
    # Проверяем кэш
    cached_result = await mcp_integration.get_cached_tool_execution(
        tool_name, arguments
    )
    
    if cached_result:
        return cached_result['result']
    
    # Выполняем инструмент
    result = await execute_1c_tool(tool_name, arguments)
    
    # Кэшируем результат
    await mcp_integration.cache_tool_execution(tool_name, arguments, result)
    
    return result

Интеграция с onec_client.py

from cache.integration_examples import execute_1c_query_with_cache

# Выполнение запроса с кэшированием
async def get_user_list(active_only=True):
    query = "SELECT * FROM Справочник.Пользователи WHERE Активен = &Активен"
    params = {"Активен": active_only}
    
    # Проверяем кэш, затем выполняем запрос
    return await execute_1c_query_with_cache(query, params)

Интеграция с существующим кодом

# Существующая функция
def expensive_calculation(param1, param2):
    # Дорогостоящие вычисления
    return complex_result

# Добавляем кэширование
@cached(ttl=1800, data_type='aggregates')
def expensive_calculation(param1, param2):
    # Дорогостоящие вычисления
    return complex_result

📊 Мониторинг

Получение статистики

from cache import get_cache_stats
from cache.integration_examples import CacheManager

# Базовая статистика
stats = get_cache_stats()
print(f"Записей в кэше: {stats['total_entries']}")
print(f"Использование памяти: {stats['memory_usage_mb']:.2f} MB")
print(f"Hit ratio: {stats['hit_ratio']:.2%}")
print(f"Попадания: {stats['hits']}, Промахи: {stats['misses']}")

# Детальная статистика
cache_manager = CacheManager()
detailed_stats = cache_manager.get_detailed_stats()
print(f"Распределение по типам: {detailed_stats['cache_distribution']}")
print(f"Топ инструментов: {detailed_stats['top_cached_tools']}")

Метрики производительности

cache = get_cache()
metrics = cache.get_metrics()

# Основные метрики
print(f"Hit Ratio: {metrics.hit_ratio:.2%}")
print(f"Попадания: {metrics.hits}")
print(f"Промахи: {metrics.misses}")
print(f"Вытеснения: {metrics.evictions}")
print(f"Ошибки: {metrics.errors}")

Периодическое обслуживание

import asyncio
from cache.integration_examples import periodic_cache_maintenance

# Периодическое обслуживание (каждый час)
async def maintenance_task():
    while True:
        await periodic_cache_maintenance()
        await asyncio.sleep(3600)  # 1 час

# Запуск в фоне
asyncio.create_task(maintenance_task())

📝 Примеры

Полный пример использования

import asyncio
import time
from cache import init_cache, cached, cache_tool_result, get_cache_stats

async def main():
    # Инициализация
    cache = init_cache(
        max_size_mb=100,
        default_ttl_stable=30 * 60,
        default_ttl_dynamic=5 * 60,
        persistent_cache_dir="./cache_data"
    )
    
    # Пример 1: Декоратор для функции
    @cached(ttl=300, data_type='stable')
    def get_catalog_structure(catalog_name):
        print(f"Загружаем структуру справочника {catalog_name}...")
        time.sleep(1)  # Имитация долгой операции
        return {"fields": ["id", "name", "date"], "count": 1000}
    
    # Первый вызов - выполнится функция
    structure1 = get_catalog_structure("Пользователи")
    print(f"Результат 1: {structure1}")
    
    # Второй вызов - из кэша
    structure2 = get_catalog_structure("Пользователи")
    print(f"Результат 2: {structure2}")
    
    # Пример 2: Кэширование MCP tool
    tool_name = "get_nomenclature"
    args = {"id": 123, "include_properties": True}
    result = {"nomenclature": "Товар 1", "properties": {"weight": 1.5}}
    
    cache_tool_result(tool_name, args, result)
    cached_result = get_cached_tool_result(tool_name, args)
    print(f"Закэшированный результат: {cached_result}")
    
    # Пример 3: Метаданные 1С
    metadata = {
        "type": "справочник",
        "hierarchical": True,
        "has_owners": False,
        "fields": ["Код", "Наименование", "ДатаСоздания"]
    }
    
    cache_metadata_1c("catalog", "Пользователи", metadata)
    cached_metadata = get_cached_metadata_1c("catalog", "Пользователи")
    print(f"Закэшированные метаданные: {cached_metadata}")
    
    # Пример 4: Агрегаты
    aggregates = [
        {"period": "2024-01", "sales": 100000, "count": 100},
        {"period": "2024-02", "sales": 120000, "count": 120}
    ]
    
    cache_aggregates("monthly_sales", "2024-Q1", {"region": "Moscow"}, aggregates)
    cached_aggregates = get_cached_aggregates("monthly_sales", "2024-Q1", {"region": "Moscow"})
    print(f"Закэшированные агрегаты: {cached_aggregates}")
    
    # Пример 5: Статистика
    stats = get_cache_stats()
    print(f"\nСтатистика кэша:")
    for key, value in stats.items():
        print(f"  {key}: {value}")

if __name__ == "__main__":
    asyncio.run(main())

Пример с инвалидацией

from cache import get_cache, init_cache

# Инициализация
cache = init_cache(max_size_mb=10)

# Добавляем данные
cache.set("metadata:catalog:users", {"type": "справочник"})
cache.set("metadata:catalog:organizations", {"type": "справочник"})
cache.set("aggregates:sales:2024-01", [{"value": 1000}])

print(f"Записей до инвалидации: {cache.size()}")  # 3

# Инвалидируем метаданные по шаблону
invalidation = cache.invalidation
count = invalidation.invalidate_by_pattern(cache, "metadata:*")

print(f"Инвалидировано записей: {count}")  # 2
print(f"Записей после инвалидации: {cache.size()}")  # 1

# Инвалидируем всё
invalidation.invalidate_all(cache)
print(f"Записей после полной очистки: {cache.size()}")  # 0

🧪 Тестирование

Запуск тестов

# Все тесты
python -m unittest tests.test_mcp_cache -v

# Конкретный тест
python -m unittest tests.test_mcp_cache.TestMCPToolsCache.test_basic_operations -v

# С покрытием
python -m pytest tests/test_mcp_cache.py -v --cov=cache.mcp_cache

Структура тестов

  • TestCacheEntry - тесты записей кэша
  • TestCacheMetrics - тесты метрик
  • TestCacheStrategy - тесты стратегий вытеснения
  • TestCacheInvalidation - тесты инвалидации
  • TestPersistentCache - тесты persistent cache
  • TestMCPToolsCache - основные тесты кэша
  • TestAsyncOperations - тесты асинхронных операций
  • TestDecorator - тесты декораторов
  • TestSpecializedCacheFunctions - тесты специализированных функций
  • TestCacheStatistics - тесты статистики
  • TestEdgeCases - тесты граничных случаев

Написание собственных тестов

import unittest
from cache import init_cache, get_cache

class TestMyIntegration(unittest.TestCase):
    def setUp(self):
        # Инициализация перед каждым тестом
        init_cache(max_size_mb=10)
    
    def test_my_cache_usage(self):
        cache = get_cache()
        
        # Тестирование функциональности
        result = cache.set("test", "value")
        self.assertTrue(result)
        
        value = cache.get("test")
        self.assertEqual(value, "value")

Производительность

Ожидаемые показатели

Метрика Базовое значение После оптимизации
Hit Ratio 0% 80-95%
Средняя латентность 100-500ms 5-20ms
Стоимость токенов 100% 60-80% (с оптимизацией JSON)
Нагрузка на БД 100% 20-40%
Пропускная способность 1x 3-5x

Оптимизация конфигурации

# Для высоконагруженных систем
cache = init_cache(
    max_size_mb=500,                    # Увеличиваем размер
    default_ttl_stable=60 * 60,         # 1 час для стабильных данных
    default_ttl_dynamic=10 * 60,        # 10 минут для динамических
    persistent_cache_dir="/fast_ssd/cache",  # Быстрое хранилище
    strategy=TTLCacheStrategy()         # TTL стратегия
)

# Для экономии памяти
cache = init_cache(
    max_size_mb=50,                     # Уменьшаем размер
    default_ttl_stable=15 * 60,         # 15 минут
    default_ttl_dynamic=2 * 60,         # 2 минуты
    persistent_cache_dir=None,          # Только память
    strategy=LRUStrategy()              # LRU стратегия
)

Мониторинг производительности

import time
from cache import get_cache

def benchmark_cache_operations():
    cache = get_cache()
    
    # Тест записи
    start_time = time.time()
    for i in range(1000):
        cache.set(f"key_{i}", f"value_{i}")
    write_time = time.time() - start_time
    
    # Тест чтения
    start_time = time.time()
    for i in range(1000):
        cache.get(f"key_{i}")
    read_time = time.time() - start_time
    
    print(f"Время записи 1000 записей: {write_time:.3f}s")
    print(f"Время чтения 1000 записей: {read_time:.3f}s")
    print(f"Время на операцию записи: {write_time*1000/1000:.3f}ms")
    print(f"Время на операцию чтения: {read_time*1000/1000:.3f}ms")

🐛 Отладка

Включение подробного логирования

import logging

# Настройка логирования
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Логирование кэша
cache_logger = logging.getLogger('cache.mcp_cache')
cache_logger.setLevel(logging.DEBUG)

Проверка состояния кэша

from cache.integration_examples import CacheManager

def debug_cache_state():
    cache_manager = CacheManager()
    stats = cache_manager.get_detailed_stats()
    
    print("=== Состояние кэша ===")
    print(f"Всего записей: {stats['total_entries']}")
    print(f"Использование памяти: {stats['memory_usage_mb']:.2f} MB")
    print(f"Hit ratio: {stats['hit_ratio']:.2%}")
    
    print("\n=== Распределение по типам ===")
    for data_type, count in stats['cache_distribution'].items():
        print(f"{data_type}: {count}")
    
    print("\n=== Истёкшие записи ===")
    print(f"Количество: {stats['expired_entries_count']}")
    
    print("\n=== Топ инструментов ===")
    for tool in stats['top_cached_tools'][:5]:
        print(f"{tool['tool_name']}: {tool['access_count']} доступов")

🔄 Миграция и обновление

Версионирование кэша

# При изменении формата данных
def get_versioned_cache_key(key, version="v1"):
    return f"{version}:{key}"

# Использование
cache_key = get_versioned_cache_key("user_data", "v2")
cache.set(cache_key, new_format_data)

Очистка старого кэша

async def migrate_cache():
    """Миграция кэша при обновлении версии"""
    old_cache = get_cache()
    
    # Создаём новый кэш с новой версией
    new_cache = init_cache(persistent_cache_dir="./cache_v2")
    
    # Переносим совместимые данные
    for key, entry in old_cache._cache.items():
        if not entry.is_expired:
            new_key = f"v2:{key}"
            new_cache.set(new_key, entry.data, ttl=entry.ttl, 
                         data_type=determine_data_type(key))
    
    print("Миграция кэша завершена")

📚 Дополнительные ресурсы

Примеры и интеграция

Производительность

  • Базовое время ответа: < 1ms для кэш-попаданий
  • Память: ~1KB на запись (зависит от размера данных)
  • Пропускная способность: 10,000+ операций/сек

📄 Лицензия

Модуль создан для проекта 1C MCP Server. Версия: 1.0.0

Возможности

TODO: Добавить содержание раздела.

Быстрый Старт

TODO: Добавить содержание раздела.

Архитектура

TODO: Добавить содержание раздела.

Конфигурация

TODO: Добавить содержание раздела.

Интеграция

TODO: Добавить содержание раздела.

Мониторинг

TODO: Добавить содержание раздела.

Примеры

TODO: Добавить содержание раздела.

Тестирование

TODO: Добавить содержание раздела.