sudo-iのBlog

  • 🍟首页
  • 🍊目录
    • 技术分享
    • vps教程
    • 软件分享
    • 干货分享
  • 🍎链接
  • 🍓工具
    • 🌽IP路由追踪
    • 域名被墙检测
    • KMS激活
    • 域名whois查询
  • 🍕联系
  • 🍌登录
Sudo-i
关注互联网,生活,音乐,乐此不疲
  1. 首页
  2. 干货分享
  3. 正文

缓存策略详解:从基础到高级的完整实践指南

7 3 月, 2026 49点热度 0人点赞 0条评论

# 缓存策略详解:从基础到高级的完整实践指南

在高性能系统设计中,缓存是提升响应速度、降低数据库负载的关键技术。本文将深入探讨各种缓存策略,从基础概念到高级实践,帮助你构建高效的缓存系统。

## 一、缓存的核心价值

缓存的本质是用空间换时间,将频繁访问的数据存储在更快的存储介质中。合理的缓存策略可以带来:

- **响应速度提升**:从毫秒级降至微秒级
- **数据库负载降低**:减少 80% 以上的读请求
- **系统吞吐量增加**:支持更高的并发量
- **成本优化**:减少数据库实例规模

## 二、常见缓存策略

### 1. Cache-Aside(旁路缓存)

最经典的缓存模式,应用层负责缓存管理:

```python
import redis
import json

class CacheAsidePattern:
def __init__(self, redis_client, db_client):
self.cache = redis_client
self.db = db_client

def get_user(self, user_id):
# 先读缓存
cache_key = f"user:{user_id}"
cached_data = self.cache.get(cache_key)

if cached_data:
return json.loads(cached_data)

# 缓存未命中,查数据库
user_data = self.db.query("SELECT * FROM users WHERE id = %s", user_id)

if user_data:
# 写入缓存,设置过期时间
self.cache.setex(cache_key, 3600, json.dumps(user_data))

return user_data

def update_user(self, user_id, data):
# 先更新数据库
self.db.execute("UPDATE users SET %s WHERE id = %s", data, user_id)

# 删除缓存(下次读取时重建)
cache_key = f"user:{user_id}"
self.cache.delete(cache_key)
```

**适用场景**:读多写少的场景,如用户信息、商品详情。

### 2. Write-Through(写穿透)

写入操作同步更新缓存和数据库:

```python
class WriteThroughCache:
def __init__(self, cache_client, db_client):
self.cache = cache_client
self.db = db_client

def update_product(self, product_id, data):
# 同时写入缓存和数据库
self.db.execute("UPDATE products SET %s WHERE id = %s", data, product_id)
self.cache.set(f"product:{product_id}", json.dumps(data))
```

**适用场景**:对数据一致性要求高的场景。

### 3. Write-Behind(写回)

先写缓存,异步批量写入数据库:

```python
import asyncio
from collections import defaultdict

class WriteBehindCache:
def __init__(self, cache_client, db_client, batch_size=100):
self.cache = cache_client
self.db = db_client
self.batch_size = batch_size
self.write_queue = defaultdict(dict)

async def update_async(self, key, data):
# 先写缓存
self.cache.set(key, json.dumps(data))

# 加入写队列
self.write_queue[key[:key.rfind(":")]][key] = data

# 批量写入数据库
if len(self.write_queue) >= self.batch_size:
await self._flush_to_db()

async def _flush_to_db(self):
for table, records in self.write_queue.items():
await self.db.batch_update(table, records)
self.write_queue.clear()
```

**适用场景**:写操作频繁、可接受短暂延迟的场景,如计数器、日志。

## 三、缓存过期策略

### 1. TTL(Time To Live)

为每个缓存项设置生存时间:

```python
# Redis 设置 TTL
redis_client.setex("session:user123", 1800, session_data) # 30 分钟过期

# 支持相对过期和绝对过期
redis_client.set("config:feature_flags", config_json, ex=3600) # 1 小时
redis_client.set("report:daily", report_data, px=86400000) # 24 小时(毫秒)
```

### 2. LRU(Least Recently Used)

淘汰最近最少使用的数据:

```python
from collections import OrderedDict

class LRUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = OrderedDict()

def get(self, key):
if key not in self.cache:
return None
self.cache.move_to_end(key) # 标记为最近使用
return self.cache[key]

def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False) # 淘汰最旧的
```

### 3. LFU(Least Frequently Used)

淘汰访问频率最低的数据:

```python
from collections import defaultdict

class LFUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = {}
self.freq = defaultdict(int)

def get(self, key):
if key not in self.cache:
return None
self.freq[key] += 1
return self.cache[key]

def put(self, key, value):
if len(self.cache) >= self.capacity and key not in self.cache:
# 找到频率最低的 key
min_freq_key = min(self.freq, key=self.freq.get)
del self.cache[min_freq_key]
del self.freq[min_freq_key]

self.cache[key] = value
self.freq[key] += 1
```

## 四、缓存穿透、击穿、雪崩解决方案

### 1. 缓存穿透(查询不存在的数据)

```python
def get_with_bloom_filter(key):
# 布隆过滤器预判
if not bloom_filter.contains(key):
return None # 肯定不存在

cached = cache.get(key)
if cached:
return cached

data = db.query(key)
if data:
cache.setex(key, 3600, data)
else:
# 缓存空值,防止重复查询
cache.setex(key, 300, "NULL")

return data
```

### 2. 缓存击穿(热点 key 过期)

```python
import threading

def get_with_lock(key):
cached = cache.get(key)
if cached:
return cached

# 分布式锁
lock_key = f"lock:{key}"
if cache.set(lock_key, "1", nx=True, ex=10):
try:
# 双重检查
cached = cache.get(key)
if cached:
return cached

data = db.query(key)
cache.setex(key, 3600, data)
return data
finally:
cache.delete(lock_key)
else:
# 等待后重试
time.sleep(0.1)
return get_with_lock(key)
```

### 3. 缓存雪崩(大量 key 同时过期)

```python
import random

def set_with_jitter(key, value, base_ttl):
# 添加随机过期时间,避免同时过期
jitter = random.randint(0, 300) # 0-5 分钟随机
cache.setex(key, base_ttl + jitter, value)

# 批量设置时使用不同的过期时间
for item in items:
base_ttl = 3600
set_with_jitter(f"item:{item.id}", item.data, base_ttl)
```

## 五、多级缓存架构

结合本地缓存 + 分布式缓存:

```python
from cachetools import TTLCache

class MultiLevelCache:
def __init__(self, local_size=1000, redis_client=None):
self.local = TTLCache(maxsize=local_size, ttl=60) # 本地缓存 1 分钟
self.redis = redis_client # 分布式缓存

def get(self, key):
# 先查本地缓存
if key in self.local:
return self.local[key]

# 再查 Redis
if self.redis:
data = self.redis.get(key)
if data:
self.local[key] = data # 回填本地缓存
return data

return None

def set(self, key, value, ttl=3600):
self.local[key] = value
if self.redis:
self.redis.setex(key, ttl, value)
```

## 六、实战建议

1. **缓存粒度**:根据业务场景选择对象级或字段级缓存
2. **监控告警**:监控命中率、内存使用、过期键数量
3. **预热策略**:系统启动时预加载热点数据
4. **降级方案**:缓存故障时自动降级到数据库
5. **一致性保障**:根据业务容忍度选择合适的一致性策略

## 总结

缓存是提升系统性能的核心技术,但没有银弹。选择合适的缓存策略需要考虑:

- 数据读写比例
- 一致性要求
- 内存成本
- 业务容忍度

建议从简单的 Cache-Aside 模式开始,根据实际监控数据逐步优化。记住:**过早优化是万恶之源**,先让系统跑起来,再根据瓶颈针对性优化。

无关联文章

本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: 暂无
最后更新:7 3 月, 2026

李炫炫

这个人很懒,什么都没留下

点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复

COPYRIGHT © 2025 sudo-iのBlog. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

鲁ICP备2024054662号

鲁公网安备37108102000450号