1
0
mirror of https://github.com/DmitrL-dev/1cai-public.git synced 2026-04-29 13:23:57 +02:00
Files

728 lines
28 KiB
Python

# [NEXUS IDENTITY] ID: 1740785978684900644 | DATE: 2025-11-19
"""
API администрирования кэша для 1С сервера
Основан на стандартах мониторинга и кэширования из RFC 7234
и практических рекомендациях по кэшированию в 1С:Предприятие.
Включает:
- Endpoints для мониторинга и управления кэшем
- Middleware для сбора метрик
- Аутентификацию для административных операций
- Интеграцию с различными типами кэшей
"""
import json
import logging
import time
from collections import deque
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional
import psutil
from fastapi import (APIRouter, Depends, HTTPException, Request, Security,
status)
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseModel, Field
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Схемы Pydantic для API
class CacheStats(BaseModel):
"""Модель статистики кэша"""
total_keys: int = Field(..., description="Общее количество ключей")
memory_usage_bytes: int = Field(..., description="Использование памяти в байтах")
memory_usage_mb: float = Field(..., description="Использование памяти в МБ")
hit_count: int = Field(..., description="Количество попаданий")
miss_count: int = Field(..., description="Количество промахов")
hit_rate: float = Field(..., description="Коэффициент попаданий (0.0 - 1.0)")
avg_response_time_ms: float = Field(..., description="Среднее время отклика в мс")
max_response_time_ms: float = Field(..., description="Максимальное время отклика в мс")
min_response_time_ms: float = Field(..., description="Минимальное время отклика в мс")
last_updated: datetime = Field(default_factory=datetime.now, description="Время последнего обновления")
class CacheHealth(BaseModel):
"""Модель проверки здоровья кэша"""
status: str = Field(..., description="Статус (healthy/degraded/unhealthy)")
timestamp: datetime = Field(default_factory=datetime.now, description="Время проверки")
checks: Dict[str, Any] = Field(..., description="Результаты проверок")
uptime_seconds: float = Field(..., description="Время работы в секундах")
class CacheKeyInfo(BaseModel):
"""Модель информации о ключе кэша"""
key: str = Field(..., description="Ключ кэша")
size_bytes: int = Field(..., description="Размер данных в байтах")
ttl_seconds: Optional[int] = Field(None, description="Время жизни в секундах")
created_at: datetime = Field(..., description="Время создания")
last_accessed: datetime = Field(..., description="Последний доступ")
hit_count: int = Field(default=0, description="Количество обращений")
class InvalidateRequest(BaseModel):
"""Модель запроса на инвалидацию кэша"""
keys: List[str] = Field(..., description="Список ключей для инвалидации")
reason: Optional[str] = Field(None, description="Причина инвалидации")
# Счетчики и метрики кэша
@dataclass
class CacheMetrics:
"""Метрики производительности кэша"""
hit_count: int = 0
miss_count: int = 0
response_times: deque = deque(maxlen=1000) # Последние 1000 измерений
start_time: float = time.time()
memory_snapshots: deque = deque(maxlen=100) # Последние 100 снимков памяти
@property
def hit_rate(self) -> float:
"""Вычисляет коэффициент попаданий"""
total = self.hit_count + self.miss_count
return self.hit_count / total if total > 0 else 0.0
@property
def avg_response_time(self) -> float:
"""Вычисляет среднее время отклика"""
if not self.response_times:
return 0.0
return sum(self.response_times) / len(self.response_times)
@property
def max_response_time(self) -> float:
"""Максимальное время отклика"""
return max(self.response_times) if self.response_times else 0.0
@property
def min_response_time(self) -> float:
"""Минимальное время отклика"""
return min(self.response_times) if self.response_times else 0.0
# Глобальные переменные для метрик
cache_metrics = CacheMetrics()
active_caches: Dict[str, Any] = {}
# Аутентификация
security = HTTPBearer(auto_error=False)
async def verify_admin_token(
credentials: HTTPAuthorizationCredentials = Security(security)
) -> bool:
"""
Проверка токена администратора.
В реальном приложении здесь должна быть проверка JWT токена
или интеграция с системой аутентификации.
"""
# Простая проверка токена (в реальности нужна безопасная проверка)
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Требуется аутентификация",
headers={"WWW-Authenticate": "Bearer"},
)
# Проверка токена (пример - должен быть заменен на реальную проверку)
expected_token = "admin_token_123" # В продакшене - из переменных окружения
if credentials.credentials != expected_token:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав доступа"
)
return True
# Middleware для сбора метрик
async def cache_middleware(request: Request, call_next):
"""Middleware для сбора метрик кэша"""
start_time = time.time()
# Игнорируем internal пути
if request.url.path.startswith(("/cache/", "/docs", "/openapi.json", "/health")):
return await call_next(request)
try:
response = await call_next(request)
# Сбор метрик для кэш-запросов
if response.headers.get("X-Cache-Status") in ["HIT", "MISS"]:
response_time = (time.time() - start_time) * 1000 # в миллисекундах
cache_metrics.response_times.append(response_time)
cache_status = response.headers.get("X-Cache-Status")
if cache_status == "HIT":
cache_metrics.hit_count += 1
elif cache_status == "MISS":
cache_metrics.miss_count += 1
# Добавляем заголовки с метриками
if cache_metrics.response_times:
response.headers["X-Avg-Response-Time"] = f"{cache_metrics.avg_response_time:.2f}ms"
response.headers["X-Cache-Hit-Rate"] = f"{cache_metrics.hit_rate:.2%}"
return response
except Exception as e:
logger.error(f"Ошибка в cache middleware: {e}")
raise
# Интерфейс кэша
class CacheInterface:
"""Базовый интерфейс для работы с кэшами"""
def __init__(self, name: str, cache_type: str = "memory"):
self.name = name
self.cache_type = cache_type
self._data: Dict[str, Any] = {}
self._metadata: Dict[str, Dict[str, Any]] = {}
self._created_at = time.time()
def get(self, key: str) -> Optional[Any]:
"""Получить значение из кэша"""
start_time = time.time()
if key in self._data:
# Обновляем метаданные
self._metadata[key]["last_accessed"] = datetime.now()
self._metadata[key]["hit_count"] += 1
response_time = (time.time() - start_time) * 1000
cache_metrics.response_times.append(response_time)
cache_metrics.hit_count += 1
return self._data[key]
cache_metrics.miss_count += 1
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Установить значение в кэш"""
self._data[key] = value
# Сохраняем метаданные
self._metadata[key] = {
"created_at": datetime.now(),
"last_accessed": datetime.now(),
"hit_count": 0,
"ttl": ttl,
"size_bytes": len(json.dumps(value, default=str))
}
def delete(self, key: str) -> bool:
"""Удалить значение из кэша"""
if key in self._data:
del self._data[key]
del self._metadata[key]
return True
return False
def clear(self) -> None:
"""Очистить кэш"""
self._data.clear()
self._metadata.clear()
def get_stats(self) -> Dict[str, Any]:
"""Получить статистику кэша"""
total_size = sum(
meta.get("size_bytes", 0)
for meta in self._metadata.values()
)
return {
"name": self.name,
"type": self.cache_type,
"total_keys": len(self._data),
"memory_usage_bytes": total_size,
"memory_usage_mb": total_size / (1024 * 1024),
"uptime_seconds": time.time() - self._created_at
}
def get_keys_info(self) -> List[CacheKeyInfo]:
"""Получить информацию о всех ключах"""
keys_info = []
for key, meta in self._metadata.items():
keys_info.append(CacheKeyInfo(
key=key,
size_bytes=meta.get("size_bytes", 0),
ttl_seconds=meta.get("ttl"),
created_at=meta["created_at"],
last_accessed=meta["last_accessed"],
hit_count=meta["hit_count"]
))
return keys_info
def get_key(self, key: str) -> Optional[CacheKeyInfo]:
"""Получить информацию о конкретном ключе"""
if key in self._metadata:
meta = self._metadata[key]
return CacheKeyInfo(
key=key,
size_bytes=meta.get("size_bytes", 0),
ttl_seconds=meta.get("ttl"),
created_at=meta["created_at"],
last_accessed=meta["last_accessed"],
hit_count=meta["hit_count"]
)
return None
# Реализация различных типов кэшей
class MemoryCache(CacheInterface):
"""Кэш в памяти"""
def __init__(self, name: str):
super().__init__(name, "memory")
class RedisCache:
"""Адаптер для Redis (заглушка)"""
def __init__(self, name: str, connection_string: str = "redis://localhost:6379"):
self.name = name
self.connection_string = connection_string
self._metadata: Dict[str, Dict[str, Any]] = {}
# В реальной реализации здесь будет подключение к Redis
logger.info(f"Инициализация Redis кэша: {name}")
def get_stats(self) -> Dict[str, Any]:
"""Статистика Redis кэша"""
return {
"name": self.name,
"type": "redis",
"connection": self.connection_string,
"total_keys": len(self._metadata),
"memory_usage_bytes": 0, # В реальности получаем из Redis
"uptime_seconds": 0
}
# Инициализация кэшей
def initialize_caches():
"""Инициализация различных кэшей системы"""
global active_caches
# Кэш метаданных 1С
active_caches["metadata"] = MemoryCache("1c_metadata")
# Кэш результатов вычислений
active_caches["computations"] = MemoryCache("1c_computations")
# Кэш HTTP запросов
active_caches["http_responses"] = MemoryCache("http_responses")
# Кэш пользовательских сессий
active_caches["user_sessions"] = MemoryCache("user_sessions")
# Redis кэш (если настроен)
if True: # В реальности - проверка наличия Redis
active_caches["redis"] = RedisCache("main_redis")
logger.info(f"Инициализировано {len(active_caches)} кэшей")
# Создание роутера
router = APIRouter(prefix="/cache", tags=["cache_admin"])
@router.get("/stats", response_model=CacheStats)
async def get_cache_stats(
current_user: bool = Depends(verify_admin_token)
) -> CacheStats:
"""
Получить статистику всех кэшей системы.
Включает:
- Общее количество ключей
- Использование памяти
- Коэффициент попаданий
- Время отклика
"""
try:
# Собираем статистику по всем кэшам
total_keys = 0
total_memory = 0
cache_details = []
for cache_name, cache in active_caches.items():
if hasattr(cache, 'get_stats'):
stats = cache.get_stats()
total_keys += stats["total_keys"]
total_memory += stats["memory_usage_bytes"]
cache_details.append(stats)
# Добавляем системную информацию о памяти
process = psutil.Process()
system_memory = process.memory_info().rss
return CacheStats(
total_keys=total_keys,
memory_usage_bytes=total_memory + system_memory,
memory_usage_mb=(total_memory + system_memory) / (1024 * 1024),
hit_count=cache_metrics.hit_count,
miss_count=cache_metrics.miss_count,
hit_rate=cache_metrics.hit_rate,
avg_response_time_ms=cache_metrics.avg_response_time,
max_response_time_ms=cache_metrics.max_response_time,
min_response_time_ms=cache_metrics.min_response_time
)
except Exception as e:
logger.error(f"Ошибка получения статистики кэша: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Ошибка получения статистики: {str(e)}"
)
@router.get("/keys", response_model=List[CacheKeyInfo])
async def get_cache_keys(
cache_name: Optional[str] = None,
limit: int = 100,
offset: int = 0,
current_user: bool = Depends(verify_admin_token)
) -> List[CacheKeyInfo]:
"""
Получить список ключей в кэшах.
- cache_name: имя конкретного кэша (опционально)
- limit: максимальное количество записей
- offset: смещение для пагинации
"""
try:
keys_info = []
if cache_name:
# Получаем ключи из конкретного кэша
if cache_name not in active_caches:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Кэш '{cache_name}' не найден"
)
cache = active_caches[cache_name]
if hasattr(cache, 'get_keys_info'):
keys_info = cache.get_keys_info()
else:
# Получаем ключи из всех кэшей
for name, cache in active_caches.items():
if hasattr(cache, 'get_keys_info'):
cache_keys = cache.get_keys_info()
# Добавляем префикс с именем кэша
for key_info in cache_keys:
key_info.key = f"{name}:{key_info.key}"
keys_info.extend(cache_keys)
# Сортируем по времени создания (новые сначала)
keys_info.sort(key=lambda x: x.created_at, reverse=True)
# Применяем пагинацию
paginated_keys = keys_info[offset:offset + limit]
return paginated_keys
except HTTPException:
raise
except Exception as e:
logger.error(f"Ошибка получения ключей кэша: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Ошибка получения ключей: {str(e)}"
)
@router.delete("/clear")
async def clear_cache(
cache_name: Optional[str] = None,
current_user: bool = Depends(verify_admin_token)
) -> Dict[str, str]:
"""
Очистить кэш или все кэши.
- cache_name: имя конкретного кэша (опционально)
"""
try:
cleared_caches = []
if cache_name:
# Очищаем конкретный кэш
if cache_name not in active_caches:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Кэш '{cache_name}' не найден"
)
cache = active_caches[cache_name]
if hasattr(cache, 'clear'):
cache.clear()
cleared_caches.append(cache_name)
else:
# Очищаем все кэши
for name, cache in active_caches.items():
if hasattr(cache, 'clear'):
cache.clear()
cleared_caches.append(name)
# Сбрасываем глобальные метрики
cache_metrics.hit_count = 0
cache_metrics.miss_count = 0
cache_metrics.response_times.clear()
return {
"status": "success",
"message": f"Кэши очищены: {', '.join(cleared_caches)}",
"cleared_caches": cleared_caches,
"timestamp": datetime.now().isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Ошибка очистки кэша: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Ошибка очистки кэша: {str(e)}"
)
@router.delete("/invalidate/{key}")
async def invalidate_cache_key(
key: str,
cache_name: Optional[str] = None,
current_user: bool = Depends(verify_admin_token)
) -> Dict[str, str]:
"""
Инвалидировать конкретный ключ кэша.
- key: ключ для инвалидации
- cache_name: имя кэша (опционально, если ключ содержит префикс)
"""
try:
# Обработка ключей с префиксом cache_name:key
if ":" in key and not cache_name:
cache_name, clean_key = key.split(":", 1)
else:
clean_key = key
invalidated_keys = []
if cache_name:
# Инвалидируем в конкретном кэше
if cache_name not in active_caches:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Кэш '{cache_name}' не найден"
)
cache = active_caches[cache_name]
if hasattr(cache, 'delete'):
if cache.delete(clean_key):
invalidated_keys.append(key)
else:
# Ищем ключ во всех кэшах
for name, cache in active_caches.items():
if hasattr(cache, 'delete'):
if cache.delete(clean_key):
invalidated_keys.append(f"{name}:{clean_key}")
if not invalidated_keys:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Ключ '{key}' не найден в кэшах"
)
return {
"status": "success",
"message": f"Ключи инвалидированы: {', '.join(invalidated_keys)}",
"invalidated_keys": invalidated_keys,
"timestamp": datetime.now().isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Ошибка инвалидации ключа кэша: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Ошибка инвалидации ключа: {str(e)}"
)
@router.get("/health", response_model=CacheHealth)
async def cache_health_check(
current_user: bool = Depends(verify_admin_token)
) -> CacheHealth:
"""
Проверка здоровья кэша системы.
Проверяет:
- Доступность кэшей
- Использование памяти
- Коэффициент попаданий
- Время отклика
"""
try:
checks = {}
health_status = "healthy"
# Проверка доступности кэшей
for cache_name, cache in active_caches.items():
try:
if hasattr(cache, 'get_stats'):
stats = cache.get_stats()
checks[cache_name] = {
"status": "available",
"keys": stats["total_keys"],
"memory_mb": round(stats["memory_usage_mb"], 2)
}
else:
checks[cache_name] = {"status": "available", "note": "Redis адаптер"}
except Exception as e:
checks[cache_name] = {"status": "error", "error": str(e)}
health_status = "degraded"
# Проверка метрик производительности
total_requests = cache_metrics.hit_count + cache_metrics.miss_count
if cache_metrics.hit_rate < 0.5:
checks["hit_rate"] = {
"status": "warning",
"value": cache_metrics.hit_rate,
"message": "Низкий коэффициент попаданий"
}
if health_status == "healthy":
health_status = "degraded"
else:
checks["hit_rate"] = {
"status": "ok",
"value": cache_metrics.hit_rate
}
# Проверка времени отклика
if cache_metrics.avg_response_time > 100: # больше 100ms
checks["response_time"] = {
"status": "warning",
"avg_ms": round(cache_metrics.avg_response_time, 2),
"message": "Высокое время отклика"
}
if health_status == "healthy":
health_status = "degraded"
else:
checks["response_time"] = {
"status": "ok",
"avg_ms": round(cache_metrics.avg_response_time, 2)
}
# Проверка использования памяти
process = psutil.Process()
memory_percent = process.memory_percent()
if memory_percent > 80:
checks["memory"] = {
"status": "warning",
"percent": round(memory_percent, 2),
"message": "Высокое использование памяти"
}
if health_status == "healthy":
health_status = "degraded"
else:
checks["memory"] = {
"status": "ok",
"percent": round(memory_percent, 2)
}
uptime_seconds = time.time() - cache_metrics.start_time
return CacheHealth(
status=health_status,
checks=checks,
uptime_seconds=uptime_seconds
)
except Exception as e:
logger.error(f"Ошибка проверки здоровья кэша: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Ошибка проверки здоровья: {str(e)}"
)
@router.get("/key/{key}", response_model=Optional[CacheKeyInfo])
async def get_cache_key_info(
key: str,
current_user: bool = Depends(verify_admin_token)
) -> Optional[CacheKeyInfo]:
"""
Получить подробную информацию о конкретном ключе кэша.
"""
try:
# Обработка ключей с префиксом cache_name:key
if ":" in key:
cache_name, clean_key = key.split(":", 1)
if cache_name not in active_caches:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Кэш '{cache_name}' не найден"
)
cache = active_caches[cache_name]
if hasattr(cache, 'get_key'):
key_info = cache.get_key(clean_key)
if key_info:
key_info.key = key # Возвращаем полный ключ с префиксом
return key_info
else:
# Ищем ключ во всех кэшах
for cache_name, cache in active_caches.items():
if hasattr(cache, 'get_key'):
key_info = cache.get_key(key)
if key_info:
key_info.key = f"{cache_name}:{key_info.key}"
return key_info
return None
except HTTPException:
raise
except Exception as e:
logger.error(f"Ошибка получения информации о ключе: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Ошибка получения информации о ключе: {str(e)}"
)
@router.get("/list")
async def list_caches(
current_user: bool = Depends(verify_admin_token)
) -> Dict[str, Dict[str, Any]]:
"""
Получить список всех доступных кэшей системы.
"""
try:
cache_list = {}
for cache_name, cache in active_caches.items():
if hasattr(cache, 'get_stats'):
cache_list[cache_name] = cache.get_stats()
else:
cache_list[cache_name] = {
"name": cache_name,
"type": "redis",
"connection": getattr(cache, 'connection_string', 'unknown')
}
return cache_list
except Exception as e:
logger.error(f"Ошибка получения списка кэшей: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Ошибка получения списка кэшей: {str(e)}"
)
# Инициализация при импорте модуля
initialize_caches()
# Экспорт для использования в других модулях
__all__ = [
"router",
"cache_middleware",
"CacheStats",
"CacheHealth",
"CacheKeyInfo",
"MemoryCache",
"RedisCache",
"cache_metrics"
]