# 缓存策略详解:从基础到高级的完整实践指南
在高性能系统设计中,缓存是提升响应速度、降低数据库负载的关键技术。本文将深入探讨各种缓存策略,从基础概念到高级实践,帮助你构建高效的缓存系统。
## 一、缓存的核心价值
缓存的本质是用空间换时间,将频繁访问的数据存储在更快的存储介质中。合理的缓存策略可以带来:
- **响应速度提升**:从毫秒级降至微秒级
- **数据库负载降低**:减少 80% 以上的读请求
- **系统吞吐量增加**:支持更高的并发量
- **成本优化**:减少数据库实例规模
## 二、常见缓存策略
### 1. Cache-Aside(旁路缓存)
最经典的缓存模式,应用层负责缓存管理:
```python
import redis
import json
class CacheAsidePattern:
def __init__(self, redis_client, db_client):
self.cache = redis_client
self.db = db_client
def get_user(self, user_id):
# 先读缓存
cache_key = f"user:{user_id}"
cached_data = self.cache.get(cache_key)
if cached_data:
return json.loads(cached_data)
# 缓存未命中,查数据库
user_data = self.db.query("SELECT * FROM users WHERE id = %s", user_id)
if user_data:
# 写入缓存,设置过期时间
self.cache.setex(cache_key, 3600, json.dumps(user_data))
return user_data
def update_user(self, user_id, data):
# 先更新数据库
self.db.execute("UPDATE users SET %s WHERE id = %s", data, user_id)
# 删除缓存(下次读取时重建)
cache_key = f"user:{user_id}"
self.cache.delete(cache_key)
```
**适用场景**:读多写少的场景,如用户信息、商品详情。
### 2. Write-Through(写穿透)
写入操作同步更新缓存和数据库:
```python
class WriteThroughCache:
def __init__(self, cache_client, db_client):
self.cache = cache_client
self.db = db_client
def update_product(self, product_id, data):
# 同时写入缓存和数据库
self.db.execute("UPDATE products SET %s WHERE id = %s", data, product_id)
self.cache.set(f"product:{product_id}", json.dumps(data))
```
**适用场景**:对数据一致性要求高的场景。
### 3. Write-Behind(写回)
先写缓存,异步批量写入数据库:
```python
import asyncio
from collections import defaultdict
class WriteBehindCache:
def __init__(self, cache_client, db_client, batch_size=100):
self.cache = cache_client
self.db = db_client
self.batch_size = batch_size
self.write_queue = defaultdict(dict)
async def update_async(self, key, data):
# 先写缓存
self.cache.set(key, json.dumps(data))
# 加入写队列
self.write_queue[key[:key.rfind(":")]][key] = data
# 批量写入数据库
if len(self.write_queue) >= self.batch_size:
await self._flush_to_db()
async def _flush_to_db(self):
for table, records in self.write_queue.items():
await self.db.batch_update(table, records)
self.write_queue.clear()
```
**适用场景**:写操作频繁、可接受短暂延迟的场景,如计数器、日志。
## 三、缓存过期策略
### 1. TTL(Time To Live)
为每个缓存项设置生存时间:
```python
# Redis 设置 TTL
redis_client.setex("session:user123", 1800, session_data) # 30 分钟过期
# 支持相对过期和绝对过期
redis_client.set("config:feature_flags", config_json, ex=3600) # 1 小时
redis_client.set("report:daily", report_data, px=86400000) # 24 小时(毫秒)
```
### 2. LRU(Least Recently Used)
淘汰最近最少使用的数据:
```python
from collections import OrderedDict
class LRUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key not in self.cache:
return None
self.cache.move_to_end(key) # 标记为最近使用
return self.cache[key]
def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False) # 淘汰最旧的
```
### 3. LFU(Least Frequently Used)
淘汰访问频率最低的数据:
```python
from collections import defaultdict
class LFUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = {}
self.freq = defaultdict(int)
def get(self, key):
if key not in self.cache:
return None
self.freq[key] += 1
return self.cache[key]
def put(self, key, value):
if len(self.cache) >= self.capacity and key not in self.cache:
# 找到频率最低的 key
min_freq_key = min(self.freq, key=self.freq.get)
del self.cache[min_freq_key]
del self.freq[min_freq_key]
self.cache[key] = value
self.freq[key] += 1
```
## 四、缓存穿透、击穿、雪崩解决方案
### 1. 缓存穿透(查询不存在的数据)
```python
def get_with_bloom_filter(key):
# 布隆过滤器预判
if not bloom_filter.contains(key):
return None # 肯定不存在
cached = cache.get(key)
if cached:
return cached
data = db.query(key)
if data:
cache.setex(key, 3600, data)
else:
# 缓存空值,防止重复查询
cache.setex(key, 300, "NULL")
return data
```
### 2. 缓存击穿(热点 key 过期)
```python
import threading
def get_with_lock(key):
cached = cache.get(key)
if cached:
return cached
# 分布式锁
lock_key = f"lock:{key}"
if cache.set(lock_key, "1", nx=True, ex=10):
try:
# 双重检查
cached = cache.get(key)
if cached:
return cached
data = db.query(key)
cache.setex(key, 3600, data)
return data
finally:
cache.delete(lock_key)
else:
# 等待后重试
time.sleep(0.1)
return get_with_lock(key)
```
### 3. 缓存雪崩(大量 key 同时过期)
```python
import random
def set_with_jitter(key, value, base_ttl):
# 添加随机过期时间,避免同时过期
jitter = random.randint(0, 300) # 0-5 分钟随机
cache.setex(key, base_ttl + jitter, value)
# 批量设置时使用不同的过期时间
for item in items:
base_ttl = 3600
set_with_jitter(f"item:{item.id}", item.data, base_ttl)
```
## 五、多级缓存架构
结合本地缓存 + 分布式缓存:
```python
from cachetools import TTLCache
class MultiLevelCache:
def __init__(self, local_size=1000, redis_client=None):
self.local = TTLCache(maxsize=local_size, ttl=60) # 本地缓存 1 分钟
self.redis = redis_client # 分布式缓存
def get(self, key):
# 先查本地缓存
if key in self.local:
return self.local[key]
# 再查 Redis
if self.redis:
data = self.redis.get(key)
if data:
self.local[key] = data # 回填本地缓存
return data
return None
def set(self, key, value, ttl=3600):
self.local[key] = value
if self.redis:
self.redis.setex(key, ttl, value)
```
## 六、实战建议
1. **缓存粒度**:根据业务场景选择对象级或字段级缓存
2. **监控告警**:监控命中率、内存使用、过期键数量
3. **预热策略**:系统启动时预加载热点数据
4. **降级方案**:缓存故障时自动降级到数据库
5. **一致性保障**:根据业务容忍度选择合适的一致性策略
## 总结
缓存是提升系统性能的核心技术,但没有银弹。选择合适的缓存策略需要考虑:
- 数据读写比例
- 一致性要求
- 内存成本
- 业务容忍度
建议从简单的 Cache-Aside 模式开始,根据实际监控数据逐步优化。记住:**过早优化是万恶之源**,先让系统跑起来,再根据瓶颈针对性优化。
文章评论