TraceStudio-dist/server/app/core/cache_manager.py

475 lines
15 KiB
Python
Raw Permalink Normal View History

2026-01-13 16:41:31 +08:00
"""
TraceStudio 缓存管理系统 (v2.0)
支持内存缓存和磁盘缓存 TTL 和淘汰策略
"""
from typing import Any, Dict, Optional, Tuple
from enum import Enum
from pathlib import Path
import json
import time
import hashlib
from threading import Lock
from collections import OrderedDict
class CacheEvictionPolicy(Enum):
"""缓存淘汰策略"""
LRU = "lru" # 最近最少使用
FIFO = "fifo" # 先进先出
LFU = "lfu" # 最少使用频率
class MemoryCache:
"""内存缓存实现 (支持 TTL 和 LRU 淘汰)"""
def __init__(self, max_size: int = 1000, ttl: Optional[int] = None, policy: CacheEvictionPolicy = CacheEvictionPolicy.LRU):
"""
初始化内存缓存
Args:
max_size: 最大条目数
ttl: 过期时间None 表示不过期
policy: 淘汰策略
"""
self.max_size = max_size
self.ttl = ttl
self.policy = policy
self._cache: OrderedDict = OrderedDict()
self._timestamps: Dict[str, float] = {}
self._access_count: Dict[str, int] = {} # 用于 LFU
self._lock = Lock()
def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
with self._lock:
# 检查是否过期
if self._is_expired(key):
self._cache.pop(key, None)
self._timestamps.pop(key, None)
self._access_count.pop(key, None)
return None
if key not in self._cache:
return None
# 更新访问信息
self._cache.move_to_end(key) # 移到末尾LRU
self._access_count[key] = self._access_count.get(key, 0) + 1
return self._cache[key]
def set(self, key: str, value: Any):
"""设置缓存值"""
with self._lock:
# 如果键已存在,删除它(更新)
if key in self._cache:
del self._cache[key]
# 如果缓存满了,执行淘汰
if len(self._cache) >= self.max_size:
self._evict_one()
# 设置值
self._cache[key] = value
self._timestamps[key] = time.time()
self._access_count[key] = 1
def delete(self, key: str) -> bool:
"""删除缓存项"""
with self._lock:
if key in self._cache:
del self._cache[key]
self._timestamps.pop(key, None)
self._access_count.pop(key, None)
return True
return False
def clear(self):
"""清空所有缓存"""
with self._lock:
self._cache.clear()
self._timestamps.clear()
self._access_count.clear()
def get_stats(self) -> Dict:
"""获取缓存统计"""
with self._lock:
return {
"current_size": len(self._cache),
"max_size": self.max_size,
"policy": self.policy.value,
"ttl": self.ttl,
"hit_count": sum(self._access_count.values())
}
def _is_expired(self, key: str) -> bool:
"""检查键是否过期"""
if self.ttl is None or key not in self._timestamps:
return False
return time.time() - self._timestamps[key] > self.ttl
def _evict_one(self):
"""执行一次淘汰"""
if not self._cache:
return
if self.policy == CacheEvictionPolicy.LRU:
# 移除最古老的项(先进先出)
key, _ = self._cache.popitem(last=False)
elif self.policy == CacheEvictionPolicy.FIFO:
# 移除第一个添加的项
key, _ = self._cache.popitem(last=False)
elif self.policy == CacheEvictionPolicy.LFU:
# 移除访问次数最少的项
key = min(self._access_count, key=self._access_count.get)
del self._cache[key]
# 清理元数据
self._timestamps.pop(key, None)
self._access_count.pop(key, None)
class DiskCache:
"""磁盘缓存实现 (支持 TTL 和清理)"""
def __init__(self, cache_dir: Path, ttl: Optional[int] = None):
"""
初始化磁盘缓存
Args:
cache_dir: 缓存目录路径
ttl: 过期时间None 表示不过期
"""
self.cache_dir = Path(cache_dir)
self.ttl = ttl
self._metadata_file = self.cache_dir / "_metadata.json"
self._lock = Lock()
# 创建缓存目录
self.cache_dir.mkdir(parents=True, exist_ok=True)
# 加载元数据
self._load_metadata()
def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
with self._lock:
# 检查是否过期
if self._is_expired(key):
self._delete_file(key)
return None
cache_file = self._get_cache_file(key)
if not cache_file.exists():
return None
try:
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get('value')
except Exception:
return None
def set(self, key: str, value: Any):
"""设置缓存值"""
with self._lock:
cache_file = self._get_cache_file(key)
try:
data = {
'value': value,
'timestamp': time.time()
}
cache_file.parent.mkdir(parents=True, exist_ok=True)
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump(data, f)
# 更新元数据
self._metadata[key] = {
'timestamp': time.time(),
'file': str(cache_file)
}
self._save_metadata()
except Exception:
pass
def delete(self, key: str) -> bool:
"""删除缓存项"""
with self._lock:
return self._delete_file(key)
def clear(self):
"""清空所有缓存"""
with self._lock:
# 删除所有缓存文件
for cache_file in self.cache_dir.glob("*.json"):
if cache_file != self._metadata_file:
try:
cache_file.unlink()
except Exception:
pass
# 清空元数据
self._metadata.clear()
self._save_metadata()
def cleanup_expired(self) -> int:
"""清理已过期的缓存项,返回删除数"""
with self._lock:
if self.ttl is None:
return 0
deleted = 0
expired_keys = []
for key in self._metadata:
if self._is_expired(key):
expired_keys.append(key)
for key in expired_keys:
if self._delete_file(key):
deleted += 1
return deleted
def get_stats(self) -> Dict:
"""获取缓存统计"""
with self._lock:
total_size = sum(
f.stat().st_size
for f in self.cache_dir.glob("*.json")
if f != self._metadata_file
)
return {
"items_count": len(self._metadata),
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"cache_dir": str(self.cache_dir),
"ttl": self.ttl
}
def _get_cache_file(self, key: str) -> Path:
"""获取缓存文件路径"""
# 使用键的哈希值作为文件名
hashed = hashlib.md5(key.encode()).hexdigest()
return self.cache_dir / f"{hashed}.json"
def _is_expired(self, key: str) -> bool:
"""检查键是否过期"""
if self.ttl is None or key not in self._metadata:
return False
timestamp = self._metadata[key].get('timestamp', 0)
return time.time() - timestamp > self.ttl
def _delete_file(self, key: str) -> bool:
"""删除缓存文件"""
if key not in self._metadata:
return False
cache_file = self._get_cache_file(key)
try:
if cache_file.exists():
cache_file.unlink()
del self._metadata[key]
self._save_metadata()
return True
except Exception:
return False
def _load_metadata(self):
"""加载元数据"""
self._metadata = {}
if self._metadata_file.exists():
try:
with open(self._metadata_file, 'r', encoding='utf-8') as f:
self._metadata = json.load(f)
except Exception:
self._metadata = {}
def _save_metadata(self):
"""保存元数据"""
try:
with open(self._metadata_file, 'w', encoding='utf-8') as f:
json.dump(self._metadata, f)
except Exception:
pass
class CacheManager:
"""缓存管理器 - 统一的缓存接口"""
_memory_cache: Optional[MemoryCache] = None
_disk_cache: Optional[DiskCache] = None
_lock = Lock()
@classmethod
def init_memory_cache(cls, max_size: int = 1000, ttl: Optional[int] = None,
policy: CacheEvictionPolicy = CacheEvictionPolicy.LRU):
"""初始化内存缓存"""
# Construct the MemoryCache instance before acquiring the class-level lock.
# This avoids a potential deadlock if the MemoryCache constructor
# (or code it calls) touches CacheManager and attempts to acquire
# the same class-level lock (threading.Lock is not re-entrant).
instance = MemoryCache(max_size, ttl, policy)
with cls._lock:
cls._memory_cache = instance
@classmethod
def init_disk_cache(cls, cache_dir: Path, ttl: Optional[int] = None):
"""初始化磁盘缓存"""
# DiskCache constructor performs I/O (mkdir, file reads). Create the
# instance first, then assign it under the lock to avoid holding the
# class-level lock during potentially blocking I/O operations.
instance = DiskCache(cache_dir, ttl)
with cls._lock:
cls._disk_cache = instance
@classmethod
def get(cls, key: str, prefer: str = "memory") -> Optional[Any]:
"""
获取缓存值
Args:
key: 缓存键
prefer: 优先级 ("memory" "disk")
Returns:
缓存值未找到返回 None
"""
if prefer == "memory":
if cls._memory_cache:
value = cls._memory_cache.get(key)
if value is not None:
return value
if cls._disk_cache:
return cls._disk_cache.get(key)
else:
if cls._disk_cache:
value = cls._disk_cache.get(key)
if value is not None:
return value
if cls._memory_cache:
return cls._memory_cache.get(key)
return None
@classmethod
def set(cls, key: str, value: Any, storage: str = "memory"):
"""
设置缓存值
Args:
key: 缓存键
value: 缓存值
storage: 存储位置 ("memory" "disk")
"""
if storage == "memory" and cls._memory_cache:
cls._memory_cache.set(key, value)
elif storage == "disk" and cls._disk_cache:
cls._disk_cache.set(key, value)
@classmethod
def set_both(cls, key: str, value: Any):
"""同时设置内存和磁盘缓存"""
if cls._memory_cache:
cls._memory_cache.set(key, value)
if cls._disk_cache:
cls._disk_cache.set(key, value)
@classmethod
def delete(cls, key: str, storage: Optional[str] = None) -> bool:
"""
删除缓存项
Args:
key: 缓存键
storage: 存储位置None 表示删除所有存储中的项
Returns:
是否删除成功
"""
deleted = False
if storage is None or storage == "memory":
if cls._memory_cache:
deleted |= cls._memory_cache.delete(key)
if storage is None or storage == "disk":
if cls._disk_cache:
deleted |= cls._disk_cache.delete(key)
return deleted
@classmethod
def clear(cls, storage: Optional[str] = None):
"""
清空缓存
Args:
storage: 存储位置None 表示清空所有存储
"""
if storage is None or storage == "memory":
if cls._memory_cache:
cls._memory_cache.clear()
if storage is None or storage == "disk":
if cls._disk_cache:
cls._disk_cache.clear()
@classmethod
def cleanup_expired(cls) -> Tuple[int, int]:
"""
清理所有过期缓存
Returns:
(内存清理数, 磁盘清理数)
"""
memory_deleted = 0
disk_deleted = 0
if cls._disk_cache:
disk_deleted = cls._disk_cache.cleanup_expired()
return memory_deleted, disk_deleted
@classmethod
def get_stats(cls) -> Dict:
"""获取缓存统计信息"""
return {
"memory": cls._memory_cache.get_stats() if cls._memory_cache else None,
"disk": cls._disk_cache.get_stats() if cls._disk_cache else None
}
@classmethod
def get_total_size_mb(cls) -> float:
"""获取总缓存大小MB"""
total = 0
if cls._disk_cache:
stats = cls._disk_cache.get_stats()
total += stats.get("total_size_mb", 0)
return round(total, 2)
class NodeCacheAdapter:
def __init__(self, storage_mode: str):
self.storage = storage_mode
def get(self, key: str):
value = CacheManager.get(key, prefer=self.storage)
print(f"NodeCacheAdapter.get<<<<<: key={key}, storage={self.storage}, value={value}")
return value
def set(self, key: str, value: Dict):
print(f"NodeCacheAdapter.set>>>>>: key={key}, storage={self.storage}, value={value}")
CacheManager.set(key, value, storage=self.storage)
return self.get(key)
def delete(self, key: str) -> bool:
return CacheManager.delete(key, storage=self.storage)
def clear(self):
return CacheManager.clear(storage=self.storage)