1
0
mirror of https://github.com/DmitrL-dev/1cai-public.git synced 2026-04-29 13:34:00 +02:00
Files

394 lines
14 KiB
Python

# [NEXUS IDENTITY] ID: -9069577396218547301 | DATE: 2025-11-19
"""
Пример интеграции HTTP кэширования с ETag в существующий FastAPI сервер.
Демонстрирует:
1. Настройку middleware для FastAPI приложения
2. Интеграцию с существующей OAuth2 авторизацией
3. Использование различных стратегий кэширования
4. Мониторинг метрик кэша
Основан на архитектуре из src/py_server/main.py и src/py_server/http_server.py
"""
import logging
from typing import Any, Dict, Optional
import uvicorn
from fastapi import FastAPI, Response
from fastapi.responses import JSONResponse
from .http_cache import (CacheHeaders, metrics_collector, setup_cache_middleware)
logger = logging.getLogger(__name__)
def create_cached_fastapi_app(
base_app: Optional[FastAPI] = None,
cache_config: Optional[Dict[str, Any]] = None
) -> FastAPI:
"""
Создает FastAPI приложение с настроенным HTTP кэшированием.
Args:
base_app: Базовое приложение для расширения
cache_config: Конфигурация кэширования
Returns:
FastAPI приложение с кэшированием
"""
# Конфигурация кэширования по умолчанию
default_cache_config = {
"secret_key": "mcp_server_secret_2024",
"cache_ttl": 3600, # 1 час
"max_cache_size": 1000,
"excluded_paths": {
"/health",
"/info",
"/token",
"/authorize",
"/register"
}
}
# Обновляем конфигурацию
if cache_config:
default_cache_config.update(cache_config)
cache_config = default_cache_config
# Создаем или используем базовое приложение
if base_app is None:
app = FastAPI(
title="MCP Server with HTTP Cache",
description="MCP-прокси сервер с HTTP кэшированием",
version="2.0.0"
)
else:
app = base_app
# Настраиваем HTTP кэширование
cache_middleware = setup_cache_middleware(
app=app,
secret_key=cache_config["secret_key"],
cache_ttl=cache_config["cache_ttl"],
max_cache_size=cache_config["max_cache_size"],
excluded_paths=cache_config["excluded_paths"]
)
# Добавляем дополнительные endpoints для управления кэшем
@app.get("/cache/admin/stats")
async def get_cache_admin_stats():
"""Подробная статистика кэша для администраторов."""
return metrics_collector.get_summary()
@app.post("/cache/admin/clear")
async def clear_cache():
"""Очистка всего кэша."""
for middleware in metrics_collector.middlewares:
middleware._cache.clear()
middleware._cache_order.clear()
logger.info("Cache cleared by admin request")
return {"status": "cleared", "message": "All cache entries removed"}
@app.get("/cache/admin/export")
async def export_cache_metrics():
"""Экспорт метрик для внешних систем мониторинга."""
return {
"prometheus_format": metrics_collector.export_prometheus(),
"json_format": metrics_collector.get_summary()
}
return app
def create_content_specific_cache_strategies() -> Dict[str, Dict[str, Any]]:
"""
Создает стратегии кэширования для разных типов контента.
Returns:
Словарь с конфигурациями для разных типов контента
"""
return {
# Статические ресурсы (справочники, редко изменяющиеся данные)
"static_data": {
"max_age": 86400, # 24 часа
"s_maxage": 43200, # 12 часов для CDN
"immutable": True,
"public": True
},
# Часто изменяющиеся данные (документы, оперативные данные)
"dynamic_data": {
"max_age": 300, # 5 минут
"s_maxage": 60, # 1 минута для CDN
"stale_while_revalidate": 30,
"stale_if_error": 300,
"public": True
},
# Персонализированные данные (зависят от пользователя)
"personal_data": {
"max_age": 1800, # 30 минут
"private": True,
"no_cache": False
},
# API данные с высокой нагрузкой
"api_data": {
"max_age": 180, # 3 минуты
"s_maxage": 30, # 30 секунд для CDN
"stale_while_revalidate": 60,
"stale_if_error": 600,
"public": True
}
}
def apply_content_specific_headers(
response: Response,
content_type: str,
data_type: str
) -> Response:
"""
Применяет специфичные заголовки кэширования для типа контента.
Args:
response: HTTP ответ
content_type: Тип контента
data_type: Тип данных для определения стратегии
Returns:
Обновленный HTTP ответ
"""
strategies = create_content_specific_cache_strategies()
if data_type not in strategies:
data_type = "api_data" # Дефолтная стратегия
strategy = strategies[data_type]
# Создаем Cache-Control заголовок
cache_control = CacheHeaders.create_cache_control(
public=strategy.get("public", False),
private=strategy.get("private", False),
max_age=strategy.get("max_age"),
s_maxage=strategy.get("s_maxage"),
no_cache=strategy.get("no_cache", False),
immutable=strategy.get("immutable", False),
stale_while_revalidate=strategy.get("stale_while_revalidate"),
stale_if_error=strategy.get("stale_if_error")
)
response.headers["Cache-Control"] = cache_control
# Добавляем X-Content-Type для отладки
response.headers["X-Content-Strategy"] = data_type
return response
def create_cached_mcp_endpoints(app: FastAPI, config: Dict[str, Any]) -> None:
"""
Создает MCP endpoints с кэшированием.
Args:
app: FastAPI приложение
config: Конфигурация сервера
"""
@app.get("/mcp/cached/health")
async def cached_health_check():
"""Кэшируемая проверка здоровья сервера."""
return {
"status": "healthy",
"timestamp": "2024-01-01T00:00:00Z",
"version": "2.0.0",
"cache_enabled": True
}
@app.get("/mcp/cached/info")
async def cached_server_info():
"""Кэшируемая информация о сервере."""
return {
"name": "MCP Server with HTTP Cache",
"version": "2.0.0",
"description": "MCP-прокси сервер с HTTP кэшированием и ETag",
"features": [
"HTTP caching with ETag",
"Conditional GET requests",
"Cache metrics and monitoring",
"Multiple cache strategies",
"FastAPI middleware integration"
],
"cache_stats": metrics_collector.get_summary()
}
@app.get("/mcp/cached/metadata/{entity_type}")
async def get_cached_metadata(entity_type: str):
"""Кэшируемые метаданные 1С."""
# Имитация получения метаданных
metadata = {
"entity_type": entity_type,
"fields": [
{"name": "id", "type": "string", "required": True},
{"name": "name", "type": "string", "required": True},
{"name": "code", "type": "string", "required": False}
],
"timestamp": "2024-01-01T00:00:00Z"
}
return apply_content_specific_headers(
JSONResponse(content=metadata),
content_type="application/json",
data_type="static_data"
)
@app.get("/mcp/cached/data/{entity_type}/{id}")
async def get_cached_entity_data(entity_type: str, id: str):
"""Кэшируемые данные сущности."""
# Имитация получения данных
data = {
"entity_type": entity_type,
"id": id,
"name": f"Entity {id}",
"code": f"CODE_{id}",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T00:00:00Z"
}
return apply_content_specific_headers(
JSONResponse(content=data),
content_type="application/json",
data_type="dynamic_data"
)
def integrate_with_existing_server():
"""
Интеграция с существующим MCP сервером.
Показывает как добавить кэширование к существующему FastAPI приложению.
"""
# Читаем существующий код из src/py_server/main.py и http_server.py
# и показываем как интегрировать кэширование
print("""
Интеграция HTTP кэширования с существующим MCP сервером:
1. В main.py добавляем:
from cache.http_cache import setup_cache_middleware
# В функции run_http_server:
cache_middleware = setup_cache_middleware(
app=app,
cache_ttl=3600,
max_cache_size=1000,
excluded_paths={"/health", "/info", "/token"}
)
2. В http_server.py добавляем кастомные стратегии:
from cache.http_cache import CacheHeaders, create_content_specific_cache_strategies
# Для разных типов данных применяем разные стратегии кэширования
response.headers["Cache-Control"] = CacheHeaders.create_cache_control(
public=True,
max_age=3600,
stale_while_revalidate=60
)
3. Добавляем endpoints для мониторинга кэша:
@app.get("/cache/metrics")
async def cache_metrics():
return metrics_collector.get_summary()
4. Настраиваем логирование кэша:
logger = logging.getLogger("cache.http_cache")
logger.setLevel(logging.INFO)
Ключевые преимущества интеграции:
- Прозрачное кэширование существующих endpoints
- Поддержка условных запросов (If-None-Match)
- Автоматическая генерация ETag
- Метрики производительности
- Гибкие стратегии кэширования
- Интеграция с OAuth2 авторизацией
""")
def run_example_server():
"""Запуск примера сервера с кэшированием."""
# Создаем приложение с кэшированием
app = create_cached_fastapi_app(cache_config={
"cache_ttl": 1800, # 30 минут
"max_cache_size": 500,
"excluded_paths": {
"/health",
"/info",
"/cache/admin/stats",
"/cache/admin/clear"
}
})
# Добавляем MCP endpoints с кэшированием
create_cached_mcp_endpoints(app, {})
# Добавляем общие endpoints
@app.get("/")
async def root():
return {
"message": "MCP Server with HTTP Cache",
"endpoints": [
"/health - Health check",
"/info - Server info",
"/cache/metrics - Cache metrics",
"/cache/metrics.prometheus - Prometheus format",
"/mcp/cached/health - Cached health",
"/mcp/cached/info - Cached info",
"/mcp/cached/metadata/{entity_type} - Cached metadata",
"/mcp/cached/data/{entity_type}/{id} - Cached entity data"
]
}
@app.get("/health")
async def health():
return {"status": "healthy", "cache_version": "2.0.0"}
print("""
Запуск примера сервера с HTTP кэшированием:
Сервер доступен по адресу: http://localhost:8000
Endpoints для тестирования кэша:
- GET /mcp/cached/health - Кэшируемая проверка здоровья
- GET /mcp/cached/info - Кэшируемая информация о сервере
- GET /mcp/cached/metadata/nomenclature - Кэшируемые метаданные
- GET /mcp/cached/data/nomenclature/123 - Кэшируемые данные
- GET /cache/metrics - Метрики кэша
- GET /cache/metrics.prometheus - Метрики в формате Prometheus
Тестирование ETag:
1. Первый запрос вернет ответ с заголовками ETag и Cache-Control
2. Повторный запрос с заголовком If-None-Match вернет 304 Not Modified
3. В заголовке X-Cache будет указано HIT или MISS
Пример curl команды:
curl -i http://localhost:8000/mcp/cached/info
curl -i -H "If-None-Match: \\"etag_value\\"" http://localhost:8000/mcp/cached/info
""")
# Запускаем сервер
uvicorn.run(
app,
host="127.0.0.1",
port=8000,
log_level="info"
)
if __name__ == "__main__":
run_example_server()