1-Intro
场景: 一个用户行为系统,
like,fav等等都会涉及到计数系统.
目标:
- 海量数据: 假设存量千亿的行为数据
- 写入
qps: 百万 - 读取
qps: 亿
一个用户行为大致大致需要 100Bytes, 算上索引假设 150Bytes
userIditemIditemOwnerIdreleationTypeeventAtstatus- ..
上面的是核心写入模型:
- 存量数据粗估是:
15T左右, 假设一个数据中心至少3副本, 就是45T - 增量的数据, 假设1天平均活跃时间12个小时, 1天增量就是
43G
基于上面的写入模型构建2个读取模型, 假设有2种事件, 点赞和收藏
-
itemCounter: 点赞数和收藏数itemIdlikeCntfavCnt
-
itemLatestTime: 最后收藏时间和最后点赞时间itemIdlatestLikeAtlatestFavAt
Tips
我们为写操作和读操作使用不同的模型,是比较典型的
CQRS架构风格, 在高性能设计中CQRS扩展性有明显优势
Question:
Mysql是否能顶的住写
Mysql 能提供的:
InnoDbB+树的索引- 主键上有自适应
Hash
TokuDb: 使用Fractal Tree, 也被称为T-Tree的数据结构来顶.- 单机写入能顶
1W 到 5W大概这个级别, 有报道说配合优化, 批处理能突破10W
- 单机写入能顶
像 Facebook , 根据他们的 Blog, 应该还能提供 LSM 结构的 RocksDB 引擎, 对写入也非常友好.
我们使用用 Mysql 分片 + Toku引擎 或者 RocksDB 引擎 来抗写, 然后基于 CDC connector 使用多级缓存来抗读, 是一个 传统的, 有一定可行性的方案 .
思路.
写的大致思路:
- 使用事件溯源的设计模式, 业务层写入到
mq则结束, 例如使用Kafka. 这应该能很轻松的顶住百万QPS的写入操作 - 应用层面, 这里可以使用熟悉的语言处理上面的
mq消息. - 瓶颈大概率集中在 存储的数据库这里了.
- 批处理能有效的进行事件压缩
- 上面的
Mysql是能顶住的 - 这里还是选择使用
ScyllaDB作为主存, 理由是更容易做分布式和异地多活, 而且ScyllaDB的内置压缩算法比较丰富,可选,能有效的节省存储成本
读的大致思路:
- 这里考虑使用
Flink来构建数据管道. 当然也可以在上面的业务代码中直接处理.flink本身有很强的扩展性, 无论是性能还是功能source: 可以基于 ScyllaDb cdc 转Kafka, 也可以在上面的应用那里直接发送Kafka, 使用eventAt作为处理时间transform: 按照itemId进行聚合,incrdecr或者更新 最后更新时间sink:counter表和cache
- 分布式缓存, 优先考虑
mcrouter或者Redis如果选用Redis, 数据结构可以是itemId为Key的Hash,HashKey是行为类型
- 要支持这个读流量 要上多级缓存, 应用层也要缓存
- 多级缓存估计还不够,要做 多数据中心分担流量, 目前的技术选型中,
ScyllaDBRedis等等都比较容易做 - 可以考虑使用边缘计算,
AWS的Lambda@Edge, 配合Redis的多数据中心, 进一步提升性能
2-Implementation
多数据中心: 假设有3个数据中心北京1个
BJ, 上海2个SH1SH2实现异地多活中 最简单的 2地3中心
cb.bj.comcb.sh.com
入由可以由边缘计算或者客户端层实现, 一个简单的方案就是 客户端 用一个定时器 探测离哪个中心更近就使用哪个 做出入口.
每个数据中心都能独立运行,是一个完整的
Deployment Unit
-
数据层例如 ,
ScyllaDB可以参考 Scylla Multiple Data Center 只需要改 2个配置, 一个RACK一个DC, 注意设置支持多MutlpleDC的Snitch复制策略即可,不麻烦 -
应用层要注意要指明希望去的
DC, 一致性由XXX改为LOCAL_XXX, 例如LOCAL_QUORUM或者LOCAL_ONE, 可以参考 Consistent Level
scylladb schema
CREATE TABLE t_act_like
(
user_id bigint,
state int,
item_id text,
event_at bigint,
PRIMARY KEY (user_id, item_id)
);
CREATE TABLE t_like_counter
(
item_id bigint,
count counter,
PRIMARY KEY (item_id)
);
CREATE TABLE t_like_at
(
item_id bigint,
last_like_at bigint,
PRIMARY KEY (item_id)
); 消费者伪代码
fun batchHandleLikeEvent(events: List<LikeDto>) {
//1. 按照 userId -> itemId 分组,并且取到 eventAt 时间最新的那一条
val compressed: List<LikeDto> = compressEvents(events)
//2. 这里可以 并发处理多个不同的 Dto
compressed.forEach {handleLikeEvent(it)}
}
fun handleLikeEvent(event: LikeDto): Boolean {
// 1. 查询 当前 event 是否是延迟事件,这样能实现有序消费, 不怕 kafka 来的时候乱序
val eventInDb = findOne(event.userId, event.itemId);
if (eventInDb != null && eventInDb.eventAt >= event.eventAt) {
return false
}
// 2. 如果 state 一致, 则完全不用处理.
if (eventInDb != null && eventInDb.state = event.state) {
return false
}
// 3. 先动数据库
likeDao.save(event)
// 4. 可选操作1: 如果担心延迟太高,提前修改缓存也是可以的, 最后基于 counter 表实现最终一致性
// optionalUpdateItemCounter(event)
// 5. 可选操作2: 这里可以发送一个 确认的点赞或者取消点赞数据, 或者直接 cdc 用来改 counter 并修改 cache
optionalSendConfirmedEvent(event)
// 6. 可选操作3: 这里还可以 直接异步的修改 counter
optionalIncrItemCounter(event)
}
- 我们这里 最关键的是修改了
event, 也就是primary, 后续的不管是 计数器 还是缓存,都走这里进行 补偿, 实现最终一致性